Source code for sticky_pi_api.client

"""
Main module of the client. Implements classes to interact with the API.
"""

import time
import logging
import os
from abc import ABC, abstractmethod
import pandas as pd
import shutil
from joblib import Parallel, delayed
import inspect
import shelve
import requests
from typing import Any
import json
from decorate_all_methods import decorate_all_methods
from sticky_pi_api.image_parser import ImageParser
from sticky_pi_api.utils import datetime_to_string, chunker, python_inputs_to_json, json_out_parser
from sticky_pi_api.storage import BaseStorage
from sticky_pi_api.types import List, Dict, Union, InfoType, MetadataType, AnnotType
from sticky_pi_api.specifications import LocalAPI, BaseAPISpec
from sticky_pi_api.configuration import LocalAPIConf
from sticky_pi_api.utils import md5,  multipart_etag
from requests_toolbelt.multipart.encoder import MultipartEncoder


[docs]class Cache(dict): _sync_each_n = 1024 def __init__(self, path: str): super().__init__() self._path = path self._n_writes = 0 if path: os.makedirs(os.path.dirname(path), exist_ok=True) if os.path.isfile(path): with shelve.open(path, flag='r') as d: for k, v in d.items(): self[k] = v
[docs] def add(self, function_src, results): if len(results) == 0: return if function_src not in self.keys(): self[function_src] = {} for r in results: k, vals = next(iter(r.items())) r = tuple(vals[k] for k in ["device", "datetime", "md5"]) self[function_src].update({k:r}) if self._n_writes % self._sync_each_n == self._sync_each_n - 1: self._sync() self._n_writes += 1
[docs] def get_hashes(self, function_src): if function_src not in self: return {} return set(self[function_src].keys())
[docs] def get_cached(self, function_src, hash): device, datetime, md5 = self[function_src][hash] if md5 is None: url = None else: url = hash[0] out = {"device": device, "datetime": datetime, "md5":md5, 'url':url} return out
[docs] def sync(self): return self._sync()
def _sync(self): with shelve.open(self._path, writeback=True) as d: for k, v in self.items(): d[k] = v
[docs] def delete(self): try: os.remove(self._path) except FileExistsError: logging.error('trying to detect cache, but file does not exist')
# all the client methods may take python arguments, the argument are implicitly transformed # to json-compatible values using this decorator
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__', '_diff_images_to_upload']) class BaseClient(BaseAPISpec, ABC): _put_chunk_size = 16 # number of images to handle at the same time during upload _cache_dirname = "cache" def __init__(self, local_dir: str, n_threads: int = 8, skip_on_error: bool=False): """ Abstract class that defines the methods of the client (common between remote and client). :param local_dir: The path to a client directory that acts as a client storage :param n_threads: The number of parallel threads to use to compute statistics on the image (md5 and such) :param skip_on_error: Whether to skip on upload error """ self._n_threads = n_threads self._local_dir = local_dir self._skip_on_error = skip_on_error os.makedirs(self._local_dir, exist_ok=True) cache_file = os.path.join(local_dir, self._cache_dirname, 'cache.pkl') self._cache = Cache(cache_file) @property def local_dir(self): return self._local_dir
[docs] def get_images_with_uid_annotations_series(self, info: InfoType, what_image: str = 'metadata', what_annotation: str = 'metadata') -> MetadataType: """ Retrieves images alongside their annotations (if available) for images from a given device and within a given datetime range. :param info: A list of dicts. each dicts has, at least, the keys: ``'device'``, ``'start_datetime'`` and ``'end_datetime'`` :param what_image: The nature of the image objects to retrieve. One of {``'metadata'``, ``'image'``, ``'thumbnail'``, ``'thumbnail_mini'``} :param what_annotation: The nature of the object to retrieve. One of {``'metadata'``, ``'data'``}. :return: A list of dictionaries with one element for each queried value. Each dictionary contains the fields present in the underlying database tables (see ``UIDAnnotations`` and ``Images``). """ # we first fetch the parent images matching a series parent_images = self.get_image_series(info, what=what_image) if len(parent_images) == 0: logging.warning('No image found for provided info!') return [{}] # we filter the metadata as only these two fields are necessary # info = [{k: v for k, v in p.items() if k in {"device", 'datetime'}} for p in parent_images] parent_images = pd.DataFrame(parent_images) annots = self.get_uid_annotations_series(info, what=what_annotation) if len(annots) == 0: annots = pd.DataFrame([], columns=['parent_image_id']) else: annots = pd.DataFrame(annots) out = pd.merge(parent_images, annots, how='left', left_on=['id'], right_on=['parent_image_id'], suffixes=('', '_annot')) # NaN -> None out = out.where(pd.notnull(out), None).sort_values(['device', 'datetime']) out = out.to_dict(orient='records') return out
[docs] def get_tiled_tuboid_series_itc_labels(self, info: InfoType, what: str = "metadata") -> MetadataType: tiled_tuboids = pd.DataFrame(self.get_tiled_tuboid_series(info, what)) if len(tiled_tuboids) == 0: logging.warning('No tuboids found for %s' % info) return [] itc_labels = pd.DataFrame(self._get_itc_labels([{'tuboid_id': i} for i in tiled_tuboids.tuboid_id])) if len(itc_labels) == 0: logging.warning('No ITC labels found for %s' % info) out = tiled_tuboids else: # force suffixes for ITC itc_labels.columns = itc_labels.columns.map(lambda x: str(x) + '_itc') tiled_tuboids.set_index(['id'], inplace=True) itc_labels.set_index(['parent_tuboid_id_itc'], inplace=True) out = pd.merge(tiled_tuboids, itc_labels, how='left', left_index=True, right_index=True) out = out.where(pd.notnull(out), None) #.sort_values(['device', 'datetime']) out = out.to_dict(orient='records') return out
[docs] def put_images(self, files: List[str]) -> MetadataType: """ Incrementally upload a list of client files :param files: the paths to the client files :return: the data of the uploaded files, as represented in by API """ # instead of dealing with images one by one, we send them by chunks # first find which files need to be uploaded to_upload = {} chunk_size = self._put_chunk_size * self._n_threads files = {f: md5(f) for f in files} for i, group in enumerate(chunker(files, chunk_size)): logging.info("Putting images - step 1/2 ... Computing statistics on files %i-%i / %i" % (i * chunk_size, i * chunk_size + len(group), len(files))) to_upload.update(self._diff_images_to_upload(group)) self._cache.sync() if len(to_upload) == 0: logging.warning('No image to upload!') out = [] # upload by chunks now for i, group in enumerate(chunker(to_upload, self._put_chunk_size)): logging.info("Putting images - step 2/2 ... Uploading files %i-%i / %i" % (i*self._put_chunk_size, i * self._put_chunk_size + len(group), len(to_upload))) out += self._put_new_images(group) logging.info("Putting images... Complete!") return out
[docs] def put_tiled_tuboids(self, tuboid_directories: List[str], series_info: Dict[str, Any]): def parse_tuboid_dir(directory): dirname = os.path.basename(os.path.normpath(directory)) metadata_file = os.path.join(directory, 'metadata.txt') tuboid_file = os.path.join(directory, 'tuboid.jpg') context_file = os.path.join(directory, 'context.jpg') assert len(dirname.split('.')) == 5 # 5 fields in this dir assert os.path.isfile(metadata_file) assert os.path.isfile(context_file) assert os.path.isfile(tuboid_file) assert all([k in series_info.keys() for k in ('algo_name', 'algo_version', 'start_datetime', 'end_datetime', 'device', 'n_images', )]) return {'tuboid_id': dirname, 'series_info': series_info, 'metadata': metadata_file, 'tuboid': tuboid_file, 'context': context_file} out = [] for i, group in enumerate(chunker(tuboid_directories, self._put_chunk_size)): to_upload = [parse_tuboid_dir(g) for g in group] logging.info("Putting tuboids... Uploading files %i-%i / %i" % (i*self._put_chunk_size, i * self._put_chunk_size + len(group), len(tuboid_directories))) out += self._put_tiled_tuboids(to_upload) return out
[docs] def delete_cache(self): self._cache.delete()
def _diff_images_to_upload(self, files): """ Handles the negotiation process during upload. First trying to get the images to be uploaded, Those that already exists and have the same checksum can be skipped, to only upload new images :param files: A dict of file paths mappting md5s :type files: Dict() :return: A list representing the subset of files to be uploaded :rtype: Dict() """ def local_img_stats(file: str, file_stats: float, skip_on_error: bool): try: i = ImageParser(file) except Exception as e: if skip_on_error: logging.error(e) return {(file, file_stats): {'device': None, 'datetime': None, 'md5': None, 'url': None}} else: raise e out_ = {(file, file_stats): {'device': i['device'], 'datetime': i['datetime'], 'md5': i['md5'], 'url': file}} return out_ # we can compute the stats in parallel cached_results = [] to_compute = [] # we use a set, should be faster local_img_stats_src = inspect.getsource(local_img_stats) hashes = self._cache.get_hashes(local_img_stats_src) for f, v in files.items(): key = f, os.path.getmtime(f) if key in hashes: res = {key: self._cache.get_cached(local_img_stats_src, key)} cached_results.append(res) else: to_compute.append(key) if self._n_threads > 1: computed = Parallel(n_jobs=self._n_threads)(delayed(local_img_stats)(*tc, skip_on_error = self._skip_on_error) for tc in to_compute) else: computed = [local_img_stats(*tc, skip_on_error = self._skip_on_error) for tc in to_compute] logging.info('Caching %i image stats (%i already pre-computed)' % (len(computed), len(cached_results))) self._cache.add(local_img_stats_src, computed) computed += cached_results # we request these images from the database info = [list(imd.values())[0] for imd in computed] # this is when self._skip_on_error is true, a None url means we skip if not info: return [] info = [inf for inf in info if inf["url"] is not None] matches = self.get_images(info, what='metadata') # now we diff: we ignore images that exist on DB AND have the same md5 # we put images that do not exist on db # we warn if md5s are different info = pd.DataFrame(info) matches = pd.DataFrame(matches, columns=info.columns) joined = pd.merge(info, matches, how='left', on = ['device', 'datetime'], suffixes=('', '_match')) joined['same_md5'] = joined.apply(lambda row: row.md5_match == row.md5, axis=1) joined['already_on_db'] = joined.apply(lambda row: not pd.isna(row.md5_match), axis=1) # images that are not yet on the db: out = joined[joined.already_on_db == False].url.tolist() out = {o: files[o] for o in out} # fixme. warn/prompt which has a different md5 (and match md5 is not NA) return out
[docs] def get_ml_bundle_dir(self, bundle_name: str, bundle_dir: str, what: str) -> List[Dict[str, Union[float, str]]]: assert os.path.basename(os.path.normpath(bundle_dir)) == bundle_name local_files = BaseStorage.local_bundle_files_info(bundle_dir, what) already_downloaded_dict = {au['key']: au for au in local_files} remote_files = self._get_ml_bundle_file_list(bundle_name, what) files_to_download = [] for r in remote_files: to_download = False # file does not exists on remote if r['key'] not in already_downloaded_dict: to_download = True else: local_info = already_downloaded_dict[r['key']] if r['md5'] == local_info['md5']: to_download = False elif r['mtime'] > local_info['mtime']: to_download = True if to_download: files_to_download.append(r) else: logging.info("Skipping %s (already on local)" % str(r['key'])) def download_single_file(cls, f, bundle_dir): cls._get_ml_bundle_file(f, bundle_dir) os.utime(os.path.join(bundle_dir, f['key']), (time.time(), f['mtime'])) if self._n_threads > 1: Parallel(n_jobs=self._n_threads)(delayed(download_single_file)(self.__class__, f, bundle_dir) for f in files_to_download) else: for f in files_to_download: download_single_file(self.__class__, f, bundle_dir) return files_to_download
[docs] def put_ml_bundle_dir(self, bundle_name: str, bundle_dir: str, what: str = 'all') -> List[Dict[str, Union[float, str]]]: # bundle_name = os.path.basename(os.path.normpath(bundle_dir)) files_to_upload = BaseStorage.local_bundle_files_info(bundle_dir, what) for f in files_to_upload: f['bundle_name'] = bundle_name n_local_files =len(files_to_upload) logging.info(f'Found {n_local_files} local files to upload') files_to_upload = self._get_ml_bundle_upload_links(files_to_upload) logging.info(f'{n_local_files - len(files_to_upload)} already uploaded)') for f in files_to_upload: if f['url'] is not None: self._put_ml_bundle_file(f['path'], f['url']) return files_to_upload
@abstractmethod def _put_ml_bundle_file(self, path: str, url: str): pass @classmethod @abstractmethod def _get_ml_bundle_file(cls, file: Dict[str, str], bundle_dir: str): pass @abstractmethod def _put_new_images(self, files: Dict[str, str], client_info: Dict[str, Any] = None) -> MetadataType: pass
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__']) class LocalClient(LocalAPI, BaseClient): def __init__(self, local_dir: str, n_threads: int = 8, *args, **kwargs): # ad hoc API config for the local API. define the local_dir variable api_conf = LocalAPIConf(LOCAL_DIR=local_dir) BaseClient.__init__(self, local_dir=local_dir, n_threads=n_threads) # use this config for the local/internal mock API LocalAPI.__init__(self, api_conf, *args, **kwargs) def _put_ml_bundle_file(self, path: str, url: str): if not os.path.isdir(os.path.dirname(url)): os.makedirs(os.path.dirname(url), exist_ok=True) logging.info("%s => %s" % (os.path.basename(path), url)) shutil.copy(path, url) @classmethod def _get_ml_bundle_file(cls, file_dict, bundle_dir): url = file_dict['url'] target = os.path.join(bundle_dir, file_dict['key']) if not os.path.isdir(os.path.dirname(target)): os.makedirs(os.path.dirname(target), exist_ok=True) logging.info("%s => %s" % (url, target)) shutil.copy(url, target)
[docs]class RemoteAPIException(Exception): pass
# all the client methods may take python argument, the argument are implicitly transformed # to json-compatible values using this decorator
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__', '_default_client_to_api']) class RemoteAPIConnector(BaseAPISpec): _max_retry_attempts = 5 _sleep_time_between_attempts = 1 def __init__(self, host: str, username: str, password: str, protocol: str = 'https', port: int = 443): self._host = host self._username = username self._password = password self._protocol = protocol self._port = int(port) self._token = {'token': None, 'expiration': 0} def _default_client_to_api(self, entry_point, info=None, what: str = None, files=None, data=None, headers=None, attempt=0): if entry_point != 'get_token': if self._token['expiration'] < int(time.time()) + 60: # we add 60s just to be sure self._token = self.get_token() auth = self._token['token'], '' if info is None: info = [{}] else: auth = self._username, self._password url = "%s://%s:%i/%s" % (self._protocol, self._host, self._port, entry_point) if what is not None: url += "/" + what logging.debug('Requesting %s' % url) # response = requests.post(url, json=info, files=files, auth=auth, data=data, headers=headers) # # # response = requests.post(url, json=info, files=files, auth=auth, data=data) response = requests.post(url, json=info, files=files, auth=auth, data=data, headers=headers) if response.status_code == 200: return response.json(object_hook=json_out_parser) else: if attempt >= self._max_retry_attempts: logging.error("Failed to request url: %s" % url) raise RemoteAPIException(response.content) else: time.sleep(self._sleep_time_between_attempts * self._max_retry_attempts) attempt += 1 if files is not None: for k in files.keys(): try: files[k].seek(0) except AttributeError: try: files[k][1].seek(0) except AttributeError: pass logging.warning("Failed to request url: %s. Retrying... Attempt %i" % (url, attempt + 1)) # response.close() # requests.session().close() return self._default_client_to_api(entry_point, info, what, files, data, headers = headers, attempt=attempt)
[docs] def get_token(self, client_info: Dict[str, Any] = None) -> str: return self._default_client_to_api('get_token', info=None)
def _put_new_images(self, files: Dict[str, str], client_info: Dict[str, Any] = None) -> MetadataType: out = [] for file, md5_sum in files.items(): for i in range(self._max_retry_attempts): with open(file, 'rb') as f: mp_encoder = MultipartEncoder( fields={ os.path.basename(file): (os.path.basename(file), f, 'application/jpeg'), os.path.basename(file) + ".md5": md5_sum }) header = {'Content-Type': mp_encoder.content_type, 'Connection': 'close'}# fixme not needed close try: out += self._default_client_to_api('_put_new_images', files=None, data=mp_encoder, headers=header, attempt=self._max_retry_attempts) break except RemoteAPIException: time.sleep(i) if i == self._max_retry_attempts -1: raise RemoteAPIException("Max attempts tried") return out # custom handling of file objects to upload
[docs] def get_users(self, info: List[Dict[str, str]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('get_users', info)
[docs] def put_users(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('put_users', info)
[docs] def delete_users(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('delete_users', info)
[docs] def get_images(self, info: InfoType, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('get_images', info, what=what)
[docs] def get_image_series(self, info, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('get_image_series', info, what=what)
[docs] def get_images_to_annotate(self, info, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('get_images_to_annotate', info, what=what)
[docs] def get_project_permissions(self, info: List[Dict[str, Any]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('get_project_permissions', info)
[docs] def get_project_series(self, info: List[Dict[str, Any]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('get_project_series', info)
[docs] def get_projects(self, info: List[Dict[str, Any]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('get_projects', info)
[docs] def delete_projects(self, info: List[Dict[str, str]] , client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('delete_projects', info)
[docs] def put_project_permissions(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('put_project_permissions', info)
[docs] def put_project_series(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('put_project_series', info)
[docs] def put_projects(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]: return self._default_client_to_api('put_projects', info)
[docs] def delete_images(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('delete_images', info)
[docs] def delete_tiled_tuboids(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('delete_tiled_tuboids', info)
[docs] def put_uid_annotations(self, info: AnnotType, client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('put_uid_annotations', info)
[docs] def get_uid_annotations(self, info: InfoType, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('get_uid_annotations', info, what=what)
[docs] def get_uid_annotations_series(self, info: InfoType, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('get_uid_annotations_series', info, what=what)
[docs] def delete_project_columns(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('delete_project_columns', info)
[docs] def delete_project_series(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('delete_project_series', info)
[docs] def put_project_columns(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('put_project_columns', info)
[docs] def get_tiled_tuboid_series(self, info: InfoType, what: str = "metadata", client_info: Dict[str, Any] = None) \ -> MetadataType: return self._default_client_to_api('get_tiled_tuboid_series', info=info, what=what)
def _put_tiled_tuboids(self, files: List[Dict[str, str]], client_info: Dict[str, Any] = None) -> MetadataType: out = [] for dic in files: with open(dic['metadata'], 'r') as m, open(dic['tuboid'], 'rb') as t, open(dic['context'], 'rb') as c: payload = {'metadata': ('metadata.txt', m, 'application/text'), 'tuboid': ('tuboid.jpg', t, 'application/octet-stream'), 'context': ('context.jpg', c, 'application/octet-stream'), 'tuboid_id': ('tuboid_id', json.dumps(dic['tuboid_id']), 'application/json'), 'series_info': ('series_info', json.dumps(dic['series_info']), 'application/json'), } out += self._default_client_to_api('_put_tiled_tuboids', files=payload, info=None) return out def _get_itc_labels(self, info: List[Dict], client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('_get_itc_labels', info)
[docs] def put_itc_labels(self, info: List[Dict[str, Union[str, int]]], client_info: Dict[str, Any] = None) -> MetadataType: return self._default_client_to_api('put_itc_labels', info)
def _get_ml_bundle_file_list(self, info: str, what: str = "all", client_info: Dict[str, Any] = None) -> List[Dict[str, Union[float, str]]]: return self._default_client_to_api('_get_ml_bundle_file_list', info, what=what) def _get_ml_bundle_upload_links(self, info: List[Dict[str, Union[float, str]]], client_info: Dict[str, Any] = None) -> \ List[Dict[str, Union[float, str]]]: return self._default_client_to_api('_get_ml_bundle_upload_links', info)
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__']) class RemoteClient(RemoteAPIConnector, BaseClient): def __init__(self, local_dir: str, host, username, password, protocol: str = 'https', port: int = 443, n_threads: int = 8, skip_on_error=False): BaseClient.__init__(self, local_dir=local_dir, n_threads=n_threads, skip_on_error=skip_on_error) RemoteAPIConnector.__init__(self, host, username, password, protocol, port) def _put_ml_bundle_file(self, path: str, url: Union[str, Dict]): #fixme, name this is actually not a url here, but a json str => dict response = url object_name = os.path.basename(path) logging.info("%s => %s" % (os.path.basename(path), response['fields']['key'])) with open(path, 'rb') as f: files = {'file': (object_name, f)} http_response = requests.post(response['url'], data=response['fields'], files=files) assert http_response.status_code == 204, response @classmethod def _get_ml_bundle_file(cls, file_dict, bundle_dir): url = file_dict['url'] target = os.path.join(bundle_dir, file_dict['key']) dirname = os.path.dirname(target) if not os.path.isdir(dirname): os.makedirs(dirname, exist_ok=True) target_tmp = target + ".tmp" with open(target_tmp, 'wb') as data: logging.info("%s => %s" % (url, target)) r = requests.get(url) data.write(r.content) assert os.path.isfile(target_tmp), f'{file_dict["key"]}: file not writen!' comp_md5 = multipart_etag(target_tmp, BaseStorage._multipart_chunk_size) assert comp_md5 == file_dict['md5'], f'{file_dict["key"]}: md5s differ {comp_md5} != {file_dict["md5"]}!' os.rename(target_tmp, target)