import datetime
import copy
import logging
import os
import json
import sqlalchemy
from sqlalchemy import or_, and_
from sqlalchemy.orm import sessionmaker
from itsdangerous import (TimedJSONWebSignatureSerializer
as Serializer, BadSignature, SignatureExpired)
# from multiprocessing.pool import Pool
from sqlalchemy.engine import Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy import event
import sqlite3
from sticky_pi_api.utils import json_io_converter
from sticky_pi_api.database.utils import Base
from sticky_pi_api.storage import DiskStorage, BaseStorage, S3Storage
from sticky_pi_api.configuration import BaseAPIConf
from sticky_pi_api.database.images_table import Images
from sticky_pi_api.database.uid_annotations_table import UIDAnnotations
from sticky_pi_api.types import InfoType, MetadataType, AnnotType, List, Union, Dict, Any
from sticky_pi_api.database.users_tables import Users
from sticky_pi_api.database.tuboid_series_table import TuboidSeries
from sticky_pi_api.database.tiled_tuboids_table import TiledTuboids
from sticky_pi_api.database.itc_labels_table import ITCLabels
from sticky_pi_api.utils import chunker, json_inputs_to_python
from decorate_all_methods import decorate_all_methods
from abc import ABC, abstractmethod
# this decorator ensures json inputs are formated as python objects
[docs]@decorate_all_methods(json_inputs_to_python, exclude=['__init__', '_put_new_images', '_put_tiled_tuboids'])
class BaseAPISpec(ABC):
[docs] @abstractmethod
def get_images(self, info: InfoType, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType:
"""
Retrieves information about a given set of images, defined by their parent device and the
datetime of the picture. *If an image is not available, no data is returned for this image*.
:param client_info: optional information about the client/user contains key ``'username'``
:param info: A list of dicts. each dicts has, at least, keys: ``'device'`` and ``'datetime'``
:param what: The nature of the objects to retrieve.
One of {``'metadata'``, ``'image'``, ``'thumbnail'``, ``'thumbnail-mini'``}
:return: A list of dictionaries with one element for each queried value. Each dictionary contains
the fields present in the underlying database plus a ``'url'`` fields to retrieve the actual object requested
(i.e. the ``what``) argument. In the case of ``what='metadata'``, ``url=''`` (i.e. no url is generated).
"""
pass
[docs] @abstractmethod
def delete_images(self, info: MetadataType, client_info: Dict[str, Any] = None):
"""
#fixme. this is false. it now takes a set of images as returned by get_images
Delete a set of images, defined by their parent device and the
datetime of the picture.
:param client_info: optional information about the client/user contains key ``'username'``
:param info: A list of dicts. each dicts has, at least, keys: ``'device'`` and ``'datetime'``
:return: A list of dictionaries with one element for each deleted image.
"""
pass
# fixme should be called series?
[docs] @abstractmethod
def delete_tiled_tuboids(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType:
"""
Delete a set of tiled tuboids series, defined by their parent device, start a
datetime.
:param client_info: optional information about the client/user contains key ``'username'``
:param info: A list of dicts. each dicts has, at least, keys: ``'device'``,
``'start_datetime'`` and ``'end_datetime'``
:return: A list of dictionaries with one element for each deleted image.
"""
pass
@abstractmethod
def _put_new_images(self, files: List[str], client_info: Dict[str, Any] = None) -> MetadataType:
"""
Uploads a set of client image files to the API.
The user would use ``BaseClient.put_images(files)``,
which first discovers which files are to be uploaded for incremental upload.
:param client_info: optional information about the client/user contains key ``'username'``
:param files: A list of path to client files
:return: The metadata of the files that were actually uploaded
"""
pass
[docs] @abstractmethod
def get_image_series(self, info, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType:
"""
Retrieves image sequences (i.e. series).
A series contains all images from a given device within a datetime range.
:param client_info: optional information about the client/user contains key ``'username'``
:param info: A list of dicts. each dicts has, at least, the keys:
``'device'``, ``'start_datetime'`` and ``'end_datetime'``. ``device`` is interpreted to the MySQL like operator.
For instance,one can match all devices with ``device="%"``.
:param what: The nature of the objects to retrieve.
One of {``'metadata'``, ``'image'``, ``'thumbnail'``, ``'thumbnail-mini'``}
:return: A list of dictionaries with one element for each queried value. Each dictionary contains
the fields present in the underlying database plus a ``'url'`` fields to retrieve the actual object requested
(i.e. the ``what``) argument. In the case of ``what='metadata'``, ``url=''`` (i.e. no url is generated).
"""
pass
[docs] @abstractmethod
def put_uid_annotations(self, info: AnnotType, client_info: Dict[str, Any] = None) -> MetadataType:
"""
:param info: A list of dictionaries corresponding to annotations (one list element per image).
The annotations are formatted as a dictionaries with two keys: ``'annotations'`` and ``'metadata'``.
* ``'metadata'`` must have the fields:
* ``'algo_name'``: the name of the algorithm used to find the object (e.g. ``'sticky-pi-universal-insect-detector'``)
* ``'algo_version'``: The version of the algorithm as `timestamp-md5` (e.g. ``'1598113346-ad2cd78dfaca12821046dfb8994724d5'``)
* ``'device'``: The device that took the annotated image (e.g. ``'5c173ff2'``)
* ``'datetime'``: The datetime at which the image was taken (e.g. ``'2020-06-20_21-33-24'``)
* ``'md5'``: The md5 of the image that was analysed (e.g. ``'9e6e908d9c29d332b511f8d5121857f8'``)
* ``'annotations'`` is a list where each element represent an object. It has the fields:
* ``'contour'``: a 3d array encoding the position of the vertices (as convention in OpenCV)
* ``'name'``: the name/type of the object (e.g. ``'insect'``)
* ``'fill_colour'`` and ``'stroke_colour'``: the colours of the contour (if it is to be drawn -- e.g. ``'#0000ff'``)
* ``'value'``: an optional integer further describing the contour (e.g. ``1``)
:param client_info: optional information about the client/user contains key ``'username'``
:return: The metadata of the uploaded annotations (i.e. a list od dicts. each field of the dict naming a column in the database).
This corresponds to the annotation data as represented in ``UIDAnnotations``
"""
pass
[docs] @abstractmethod
def get_uid_annotations(self, info: InfoType, what: str = 'metadata',
client_info: Dict[str, Any] = None) -> MetadataType:
"""
Retrieves annotations for a given set of images.
:param info: A list of dict with keys: ``'device'`` and ``'datetime'``
:param what: The nature of the object to retrieve. One of {``'metadata'``, ``'json'``}.
:param client_info: optional information about the client/user contains key ``'username'``
:return: A list of dictionaries with one element for each queried value.
Each dictionary contains the fields present in the underlying database table (see ``UIDAnnotations``).
In the case of ``what='metadata'``, the field ``json=''``.
Otherwise, it contains a json string with the actual annotation data.
"""
pass
[docs] @abstractmethod
def get_uid_annotations_series(self, info: InfoType, what: str = 'metadata',
client_info: Dict[str, Any] = None) -> MetadataType:
"""
Retrieves annotations for a given set of images.
:param info: A list of dicts. each dicts has, at least, the keys:
``'device'``, ``'start_datetime'`` and ``'end_datetime'``. ``device`` is interpreted to the MySQL like operator.
:param what: The nature of the object to retrieve. One of {``'metadata'``, ``'json'``}.
:param client_info: optional information about the client/user contains key ``'username'``
:return: A list of dictionaries with one element for each queried value.
Each dictionary contains the fields present in the underlying database table (see ``UIDAnnotations``).
In the case of ``what='metadata'``, the field ``json=''``.
Otherwise, it contains a json string with the actual annotation data.
"""
pass
@abstractmethod
def _put_tiled_tuboids(self, files: List[Dict[str, Union[Dict, str]]],
client_info: Dict[str, Any] = None) -> MetadataType:
"""
Uploads a set of client tiled tuboid files to the API.
The user would use ``BaseClient.put_tiled_tuboid(files)``.
:param files: A list of dict. Each dict contains keys:
* ``tuboid_id``: a formatted string describing the tuboid series and tuboid id e.g. ``08038ade.2020-07-08_20-00-00.2020-07-09_15-00-00.0002``
* ``series_info``: a dictionary with information regarding the series the tuboid belong to.
I.e. keys ``'algo_name'``, ``'algo_version'``, ``'start_datetime'``, ``'end_datetime'``, ``'device'``
* ``metadata``: the path to a comma-separated files that contains metadata for each tuboid shot, or a file-like object
* ``tuboid``: the path to a tiled jpg containing (some of) the shots described in metadata, in the same order, or a file-like object
* ``context``: the path to an illustration (jpg) of the detected object highlighted in the whole image of the first shot, or a file-like object
:param client_info: optional information about the client/user contains key ``'username'``
:return: The metadata of the files that were actually uploaded
"""
pass
[docs] @abstractmethod
def get_tiled_tuboid_series(self, info: InfoType, what: str = "metadata",
client_info: Dict[str, Any] = None) -> MetadataType:
"""
Retrieves tiled tuboids -- i.e. stitched annotations into a tile,
where the assumption is one tuboid per instance.
A series contains all tuboids fully contained within a range
:param info: A list of dicts. each dicts has, at least, the keys:
``'device'``, ``'start_datetime'`` and ``'end_datetime'``. ``device`` is interpreted to the MySQL like operator.
For instance,one can match all devices with ``device="%"``.
:param what: The nature of the objects to retrieve, either ``'data'`` or ``'metadata'``. ``'metadata'`` will not
add the extra three fields mapping the files to the results
:param client_info: optional information about the client/user contains key ``'username'``
:return: A list of dictionaries with one element for each queried value. Each dictionary contains
the fields present in the underlying database plus the fields ``'metadata'``, ``'tuboid'`` and ``'context'``
fields, which have a url to fetch the relevant file.
"""
pass
@abstractmethod
def _get_itc_labels(self, info: List[Dict], client_info: Dict[str, Any] = None) -> MetadataType:
"""
Retrieves labels for a given set of tiled tuboid
:param client_info: optional information about the client/user contains key ``'username'``
:param info: A list of dict with key: ``'tuboid_id'``
:return: A list of dictionaries with one element for each queried value.
Each dictionary contains the fields present in the underlying database table (see ``ITCLabels``).
"""
pass
[docs] @abstractmethod
def put_itc_labels(self, info: List[Dict[str, Union[str, int]]],
client_info: Dict[str, Any] = None) -> MetadataType:
"""
Stores labels for a given set of tiled tuboid
:param info: A list of dict with keys: ``'tuboid_id'``, ``'label'``, ``'pattern'``,
``'algo_version'``and ``'algo_name'`` (see ``ITCLabels``).
:param client_info: optional information about the client/user contains key ``'username'``
:return: the field corresponding to the labels that were submitted
"""
pass
@abstractmethod
def _get_ml_bundle_file_list(self, info: str, what: str = "all", client_info: Dict[str, Any] = None) -> List[
Dict[str, Union[float, str]]]:
"""
Get a list of file for a given ML Bundle.
A ML bundle contains files necessary to train and run a ML training/inference (data, configs and model).
:param info: the name of the machine learning bundle to fetch the files from
:param what: One of {``'all'``, ``'data'``,``'model'`` }, to return all files, only the training data(training),
or only the model (inference), respectively.
:param client_info: optional information about the client/user contains key ``'username'``
:return: A list of dict containing the fields ``key``, ``url`` and ``mtime`` of the files to be downloaded,
which can be used to download the files
"""
pass
@abstractmethod
def _get_ml_bundle_upload_links(self, info: List[Dict[str, Union[float, str]]],
client_info: Dict[str, Any] = None) -> \
List[Dict[str, Union[float, str]]]:
"""
Ask the client for a list of upload url for the files described in info.
:param info: a list of dict with fields {``bundle_name``,``'key'``, ``'md5'``, ``'mtime'``}.
``'key'`` is the file path, relative to the storage root (e.g. ``data/mydata.jpg``).
``bundle_name`` is the name of the machine learning bundle to fetch the files from
:param client_info: optional information about the client/user contains key ``'username'``
:return: The same list of dictionaries as ``info``, with an extra field pointing to a destination url ``'url'``,
where the client can then upload their data.
"""
pass
[docs] @abstractmethod
def get_users(self, info: List[Dict[str, str]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Get a list of API users. Either all users (Default), or filter users by field if ``info`` is specified.
In the latter case, the union of all matched users is returned.
:param info: A dictionary acting as a filter, using an SQL like-type match.
For instance ``{'username': '%'}`` return all users.
:param client_info: optional information about the client/user contains key ``'username'``
:return: A list of users as represented in the underlying database, as one dictionary [per user,
with the keys being database column names. Note that the crypo/sensitive
fields are not returned (e.g. password_hash)
"""
pass
[docs] @abstractmethod
def put_users(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Add a list of users defined by a dict of proprieties.
:param info: A list of dictionary each dictionary has the fields {``'username'``, ``'password'``},
and optionally: {``'email'``, ``'is_admin'``},
:param client_info: optional information about the client/user contains key ``'username'``
:return: A list of dictionaries describing the users that were created
"""
pass
[docs] @abstractmethod
def get_token(self, client_info: Dict[str, Any] = None) -> Dict[str, Union[str, int]]:
"""
A authentication token for a user.
injection in this function.
:param client_info: optional information about the client/user contains key ``'username'``
:return: a dictionary with the keys ``'token'`` and ``'expiration'``, an ascii formatted token,
and expiration timestamp, respectively
"""
pass
[docs]@decorate_all_methods(json_inputs_to_python, exclude=['__init__', '_put_new_images', '_put_tiled_tuboids'])
class BaseAPI(BaseAPISpec, ABC):
_storage_class = BaseStorage
_get_image_chunk_size = 64 # the maximal number of images to request from the database in one go
def __init__(self, api_conf: BaseAPIConf, *args, **kwargs):
# super().__init__()
self._configuration = api_conf
self._storage = self._storage_class(api_conf=api_conf, *args, **kwargs)
self._db_engine = self._create_db_engine()
Base.metadata.create_all(self._db_engine, Base.metadata.tables.values(), checkfirst=True)
self._serializer = Serializer(self._configuration.SECRET_API_KEY)
@abstractmethod
def _create_db_engine(self, *args, **kwargs) -> sqlalchemy.engine.Engine:
pass
def _put_new_images(self, files: List[str], client_info: Dict[str, Any] = None):
session = sessionmaker(bind=self._db_engine)()
try:
# store the uploaded images
out = []
# for each image
for f in files:
# We parse the image file to make to its own DB object
api_user = client_info['username'] if client_info is not None else None
im = Images(f, api_user=api_user)
out.append(im.to_dict())
session.add(im)
# try to store images, only commit if storage worked.
# rollback otherwise
try:
self._storage.store_image_files(im)
session.commit()
except Exception as e:
session.rollback()
logging.error("Storage Error. Failed to store image %s" % im)
logging.error(e)
raise e
return out
finally:
session.close()
def _put_tiled_tuboids(self, files: List[Dict[str, Union[str, Dict]]],
client_info: Dict[str, Any] = None): # fixme return type
session = sessionmaker(bind=self._db_engine)()
try:
# store the uploaded images
out = []
# for each tuboid
while len(files) > 0:
data = files.pop()
# We parse the tuboid data as an entry
api_user = client_info['username'] if client_info is not None else None
ts = TuboidSeries(data['series_info'])
q = session.query(TuboidSeries).filter(TuboidSeries.start_datetime == ts.start_datetime,
TuboidSeries.end_datetime == ts.end_datetime,
TuboidSeries.device == ts.device,
TuboidSeries.algo_name == ts.algo_name,
TuboidSeries.algo_version == ts.algo_version
)
# we add this series if it does not exist
if q.count() == 0:
logging.info('Adding new tuboid series %s' % ts)
session.add(ts)
session.commit()
files += [data]
return self._put_tiled_tuboids(files, client_info)
else:
ts = q.first()
logging.info('Using tuboid series %s' % ts)
tub = TiledTuboids(data, parent_tuboid_series=ts, api_user=api_user)
q = session.query(TiledTuboids).filter(TiledTuboids.parent_series_id == ts.id)
if not (q.count() < ts.n_tuboids):
raise IntegrityError(
'Cannot add more tuboids to parent series %s. Already all of them are there' % ts, None, None)
assert tub.start_datetime >= ts.start_datetime, 'tuboid starts before its parent series'
assert tub.end_datetime <= ts.end_datetime, 'tuboid ends after its parent series'
session.add(tub)
# try to store images, only commit if storage worked.
# rollback otherwise
try:
self._storage.store_tiled_tuboid(data)
session.commit()
out.append(tub.to_dict())
except Exception as e:
session.rollback()
logging.error("Storage Error. Failed to store tuboid %s" % tub)
logging.error(e)
raise e
return out
finally:
session.close()
def _get_ml_bundle_file_list(self, info: str, what: str = "all", client_info: Dict[str, Any] = None) -> \
List[Dict[str, Union[float, str]]]:
return self._storage.get_ml_bundle_file_list(info, what)
def _get_ml_bundle_upload_links(self, info: List[Dict[str, Union[float, str]]],
client_info: Dict[str, Any] = None) -> \
List[Dict[str, Union[float, str]]]:
new_info = {}
for i in info:
bundle = i['bundle_name']
if bundle not in new_info.keys():
new_info[bundle] = []
new_info[bundle].append(i)
out = []
for bundle_name, info in new_info.items():
out += self._storage.get_ml_bundle_upload_links(bundle_name, info)
return out
[docs] def get_images(self, info: MetadataType, what: str = 'metadata', client_info: Dict[str, Any] = None):
out = []
session = sessionmaker(bind=self._db_engine)()
try:
# We fetch images by chunks:
for i, info_chunk in enumerate(chunker(info, self._get_image_chunk_size)):
logging.info("Getting images... %i-%i / %i" %
(i * self._get_image_chunk_size,
i * self._get_image_chunk_size + len(info_chunk),
len(info)))
conditions = [and_(Images.datetime == inf['datetime'], Images.device == inf['device'])
for inf in info_chunk]
q = session.query(Images).filter(or_(*conditions))
for img in q:
img_dict = img.to_dict()
img_dict['url'] = self._storage.get_url_for_image(img, what)
out.append(img_dict)
return out
finally:
session.close()
[docs] def get_image_series(self, info: MetadataType, what: str = 'metadata', client_info: Dict[str, Any] = None):
session = sessionmaker(bind=self._db_engine)()
try:
out = []
for i in info:
q = session.query(Images).filter(Images.datetime >= i['start_datetime'],
Images.datetime < i['end_datetime'],
Images.device.like(i['device']))
if q.count() == 0:
logging.warning('No data for series %s' % str(i))
for img in q:
img_dict = img.to_dict()
img_dict['url'] = self._storage.get_url_for_image(img, what)
out.append(img_dict)
return out
finally:
session.close()
[docs] def delete_images(self, info: MetadataType, client_info: Dict[str, Any] = None) -> MetadataType:
out = []
session = sessionmaker(bind=self._db_engine)()
try:
# We fetch images by chunks:
for i, info_chunk in enumerate(chunker(info, self._get_image_chunk_size)):
logging.info("Deleting images... %i-%i / %i" %
(i * self._get_image_chunk_size,
i * self._get_image_chunk_size + len(info_chunk),
len(info)))
conditions = [and_(Images.datetime == inf['datetime'], Images.device == inf['device'])
for inf in info_chunk]
q = session.query(Images).filter(or_(*conditions))
for img in q:
img_dict = img.to_dict()
session.delete(img)
try:
self._storage.delete_image_files(img)
session.commit()
except Exception as e:
session.rollback()
logging.error("Storage Error. Failed to delete image %s" % img)
logging.error(e)
raise e
out.append(img_dict)
return out
finally:
session.close()
[docs] def put_uid_annotations(self, info: AnnotType, client_info: Dict[str, Any] = None):
info = copy.deepcopy(info)
session = sessionmaker(bind=self._db_engine)()
try:
all_annots = []
# for each image
for data in info:
json_str = json.dumps(data, default=json_io_converter)
dic = data['metadata']
annotations = data['annotations']
n_objects = len(annotations)
dic['json'] = json_str
# fixme. here we should get multiple images in one go, prior to parsing annotations ?
q = session.query(Images).filter(Images.datetime == dic['datetime'], Images.device == dic['device'])
if q.count() == 0:
raise ValueError("could not find parent image for %s" % str(dic))
if q.count() > 1:
raise ValueError("<More than one parent image for %s" % str(dic))
parent_img = q.first()
# dic['parent_image_id'] = parent_img["id"]
dic['n_objects'] = n_objects
if dic['md5'] != parent_img.md5:
raise ValueError("Trying to add an annotation for %s, but md5 differ" % str(data))
dic['parent_image_id'] = parent_img.id
annot = UIDAnnotations(dic)
parent_img.uid_annotations.append(annot)
all_annots.append(annot)
session.add(annot)
# https://stackoverflow.com/questions/3659142/bulk-insert-with-sqlalchemy-orm
session.commit()
out = []
for annot in all_annots:
o = annot.to_dict()
o["json"] = ""
out.append(o)
return out
finally:
session.close()
[docs] def get_uid_annotations(self, info: MetadataType, what: str = 'metadata', client_info: Dict[str, Any] = None):
out = []
session = sessionmaker(bind=self._db_engine)()
try:
for i, info_chunk in enumerate(chunker(info, self._get_image_chunk_size)):
logging.info("Getting image annotations... %i-%i / %i" %
(i * self._get_image_chunk_size,
i * self._get_image_chunk_size + len(info_chunk),
len(info)))
conditions = [UIDAnnotations.parent_image.has(and_(Images.datetime == inf['datetime'],
Images.device == inf['device']))
for inf in info_chunk]
q = session.query(UIDAnnotations).filter(or_(*conditions))
for annots in q:
annot_dict = annots.to_dict()
if what == 'metadata':
del annot_dict['json']
out.append(annot_dict)
return out
finally:
session.close()
[docs] def get_uid_annotations_series(self, info: MetadataType, what: str = 'metadata',
client_info: Dict[str, Any] = None):
session = sessionmaker(bind=self._db_engine)()
try:
out = []
info = copy.deepcopy(info)
for i in info:
q = session.query(UIDAnnotations).filter(UIDAnnotations.parent_image.has(and_(
Images.datetime >= i['start_datetime'],
Images.datetime < i['end_datetime'],
Images.device.like(i['device']))))
if q.count() == 0:
logging.warning('No data for series %s' % str(i))
for annots in q:
annot_dict = annots.to_dict()
if what == 'metadata':
del annot_dict['json']
out.append(annot_dict)
return out
finally:
session.close()
[docs] def get_tiled_tuboid_series(self, info: InfoType, what: str = 'metadata',
client_info: Dict[str, Any] = None) -> MetadataType:
session = sessionmaker(bind=self._db_engine)()
try:
out = []
assert what in ('data', 'metadata')
info = copy.deepcopy(info)
for i in info:
q = session.query(TiledTuboids).filter(TiledTuboids.parent_series.has(
and_(TuboidSeries.start_datetime >= i['start_datetime'],
# here we need to include both bounds in case a tuboid ends just in between
TuboidSeries.end_datetime <= i['end_datetime'],
TuboidSeries.device.like(i['device']))))
if q.count() == 0:
logging.warning('No data for series %s' % str(i))
for tub in q.all():
tub_dict = tub.to_dict()
if what == 'data':
tub_dict.update(self._storage.get_urls_for_tiled_tuboids(tub_dict))
out.append(tub_dict)
return out
finally:
session.close()
[docs] def delete_tiled_tuboids(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType:
out = []
info = copy.deepcopy(info)
session = sessionmaker(bind=self._db_engine)()
try:
# We fetch images by chunks:
for inf in info:
q = session.query(TuboidSeries).filter(TuboidSeries.start_datetime >= inf['start_datetime'],
TuboidSeries.end_datetime <= inf['end_datetime'],
TuboidSeries.device.like(inf['device']))
for ts in q:
img_dict = ts.to_dict()
all_tuboids = [t for t in
session.query(TiledTuboids).filter(TiledTuboids.parent_series_id == ts.id)]
session.delete(ts)
try:
for tub in all_tuboids:
self._storage.delete_tiled_tuboid_files(tub)
session.commit()
except Exception as e:
session.rollback()
logging.error("Storage Error. Failed to delete series %s" % tub)
logging.error(e)
raise e
out.append(img_dict)
return out
finally:
session.close()
[docs] def put_itc_labels(self, info: List[Dict[str, Union[str, int]]],
client_info: Dict[str, Any] = None) -> MetadataType:
info = copy.deepcopy(info)
session = sessionmaker(bind=self._db_engine)()
try:
out = []
# for each image
for data in info:
q = session.query(TiledTuboids).filter(TiledTuboids.tuboid_id == data['tuboid_id'])
assert q.count() == 1, "No match for %s" % data
data['parent_tuboid_id'] = q.first().id
api_user = client_info['username'] if client_info is not None else None
label = ITCLabels(data, api_user=api_user)
out.append(label.to_dict())
session.add(label)
session.commit()
return out
finally:
session.close()
def _get_itc_labels(self, info: List[Dict], client_info: Dict[str, Any] = None) -> MetadataType:
info = copy.deepcopy(info)
out = []
session = sessionmaker(bind=self._db_engine)()
try:
for i, info_chunk in enumerate(chunker(info, self._get_image_chunk_size)):
logging.info("Getting tuboid label... %i-%i / %i" %
(i * self._get_image_chunk_size,
i * self._get_image_chunk_size + len(info_chunk),
len(info)))
# can be a single row
tuboid_ids = [TiledTuboids.tuboid_id == j['tuboid_id'] for j in info_chunk]
conditions = or_(*tuboid_ids)
q = session.query(TiledTuboids).filter(conditions)
parent_tuboid_ids = [ITCLabels.parent_tuboid_id == r.id for r in q.all()]
conditions = or_(*parent_tuboid_ids)
q = session.query(ITCLabels).filter(conditions)
for annots in q:
out.append(annots.to_dict())
return out
finally:
session.close()
[docs] def put_users(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
info = copy.deepcopy(info)
session = sessionmaker(bind=self._db_engine)()
try:
out = []
for data in info:
api_user = client_info['username'] if client_info is not None else None
user = Users(**data, api_user=api_user)
out.append(user.to_dict())
session.add(user)
session.commit()
out = self.get_users([{'username': o['username'] for o in out}])
return out
finally:
session.close()
[docs] def get_users(self, info: List[Dict[str, str]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
out = []
if info is None:
info = [{'username': "%"}]
session = sessionmaker(bind=self._db_engine)()
try:
for inf in info:
conditions = [and_(getattr(Users, k).like(inf[k]) for k in inf.keys())]
q = session.query(Users).filter(or_(*conditions))
for user in q.all():
user.password_hash = "***********"
user_dict = user.to_dict()
out.append(user_dict)
return out
finally:
session.close()
[docs] def verify_password(self, username_or_token: str, password: str):
session = sessionmaker(bind=self._db_engine)()
try:
if not username_or_token:
logging.warning('No such user or token `%s`' % username_or_token)
return False
try:
data = self._serializer.loads(username_or_token)
user = session.query(Users).get(data['id'])
except SignatureExpired:
user = None # valid token, but expired
except BadSignature:
user = None # invalid token
if not user:
# try to authenticate with username/password
user = session.query(Users).filter(Users.username == username_or_token).first()
if not user:
logging.warning('No such user: `%s`' % username_or_token)
return False
if not user.verify_password(password):
logging.warning('Failed to authenticate `%s` with password' % username_or_token)
return False
return user.username
finally:
session.close()
def _make_db_session(self):
return sessionmaker(bind=self._db_engine)()
# from https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#foreign-key-support
# this allow cascade delete on sqlite3
# see https://github.com/sqlalchemy/sqlalchemy/issues/4858
[docs]@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
if not isinstance(dbapi_connection, sqlite3.Connection):
return
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
[docs]class LocalAPI(BaseAPI):
_storage_class = DiskStorage
_database_filename = 'database.db'
def _create_db_engine(self):
local_dir = self._configuration.LOCAL_DIR
engine_url = "sqlite:///%s" % os.path.join(local_dir, self._database_filename)
return sqlalchemy.create_engine(engine_url, connect_args={"check_same_thread": False, 'timeout': 60})
def _make_db_session(self):
return sessionmaker(bind=self._db_engine)(autoflush=False)
[docs] def get_token(self, client_info: Dict[str, Any] = None):
return {'token': None, 'expiration': 0}
[docs]class RemoteAPI(BaseAPI):
_storage_class = S3Storage
_get_image_chunk_size = 1024 # the maximal number of images to request from the database in one go
[docs] def get_token(self, client_info: Dict[str, Any] = None) -> Dict[str, Union[str, int]]:
session = sessionmaker(bind=self._db_engine)()
username = client_info['username']
user = session.query(Users).filter(Users.username == username).first()
token = user.generate_auth_token(self._configuration.SECRET_API_KEY)
return token
[docs] def put_images(self, files: List[str], client_info: Dict[str, Any] = None):
return self._put_new_images(files, client_info=client_info)
def _create_db_engine(self):
engine_url = "mysql+pymysql://%s:%s@%s/%s?charset=utf8mb4" % (self._configuration.MYSQL_USER,
self._configuration.MYSQL_PASSWORD,
self._configuration.MYSQL_HOST,
self._configuration.MYSQL_DATABASE
)
return sqlalchemy.create_engine(engine_url, pool_recycle=3600)