"""
Main module of the client. Implements classes to interact with the API.
"""
import time
import logging
import os
from abc import ABC, abstractmethod
import pandas as pd
import shutil
from joblib import Parallel, delayed
import inspect
import shelve
import requests
from typing import Any
import json
from decorate_all_methods import decorate_all_methods
from sticky_pi_api.image_parser import ImageParser
from sticky_pi_api.utils import datetime_to_string, chunker, python_inputs_to_json, json_out_parser
from sticky_pi_api.storage import BaseStorage
from sticky_pi_api.types import List, Dict, Union, InfoType, MetadataType, AnnotType
from sticky_pi_api.specifications import LocalAPI, BaseAPISpec
from sticky_pi_api.configuration import LocalAPIConf
from sticky_pi_api.utils import md5, multipart_etag
from requests_toolbelt.multipart.encoder import MultipartEncoder
[docs]class Cache(dict):
_sync_each_n = 1024
def __init__(self, path: str):
super().__init__()
self._path = path
self._n_writes = 0
if path:
os.makedirs(os.path.dirname(path), exist_ok=True)
if os.path.isfile(path):
with shelve.open(path, flag='r') as d:
for k, v in d.items():
self[k] = v
[docs] def add(self, function_src, results):
if len(results) == 0:
return
if function_src not in self.keys():
self[function_src] = {}
for r in results:
k, vals = next(iter(r.items()))
r = tuple(vals[k] for k in ["device", "datetime", "md5"])
self[function_src].update({k:r})
if self._n_writes % self._sync_each_n == self._sync_each_n - 1:
self._sync()
self._n_writes += 1
[docs] def get_hashes(self, function_src):
if function_src not in self:
return {}
return set(self[function_src].keys())
[docs] def get_cached(self, function_src, hash):
device, datetime, md5 = self[function_src][hash]
if md5 is None:
url = None
else:
url = hash[0]
out = {"device": device, "datetime": datetime, "md5":md5, 'url':url}
return out
[docs] def sync(self):
return self._sync()
def _sync(self):
with shelve.open(self._path, writeback=True) as d:
for k, v in self.items():
d[k] = v
[docs] def delete(self):
try:
os.remove(self._path)
except FileExistsError:
logging.error('trying to detect cache, but file does not exist')
# all the client methods may take python arguments, the argument are implicitly transformed
# to json-compatible values using this decorator
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__', '_diff_images_to_upload'])
class BaseClient(BaseAPISpec, ABC):
_put_chunk_size = 16 # number of images to handle at the same time during upload
_cache_dirname = "cache"
def __init__(self, local_dir: str, n_threads: int = 8, skip_on_error: bool=False):
"""
Abstract class that defines the methods of the client (common between remote and client).
:param local_dir: The path to a client directory that acts as a client storage
:param n_threads: The number of parallel threads to use to compute statistics on the image (md5 and such)
:param skip_on_error: Whether to skip on upload error
"""
self._n_threads = n_threads
self._local_dir = local_dir
self._skip_on_error = skip_on_error
os.makedirs(self._local_dir, exist_ok=True)
cache_file = os.path.join(local_dir, self._cache_dirname, 'cache.pkl')
self._cache = Cache(cache_file)
@property
def local_dir(self):
return self._local_dir
[docs] def get_images_with_uid_annotations_series(self, info: InfoType, what_image: str = 'metadata', what_annotation: str = 'metadata') -> MetadataType:
"""
Retrieves images alongside their annotations (if available) for images from a given device and
within a given datetime range.
:param info: A list of dicts. each dicts has, at least, the keys:
``'device'``, ``'start_datetime'`` and ``'end_datetime'``
:param what_image: The nature of the image objects to retrieve.
One of {``'metadata'``, ``'image'``, ``'thumbnail'``, ``'thumbnail_mini'``}
:param what_annotation: The nature of the object to retrieve. One of {``'metadata'``, ``'data'``}.
:return: A list of dictionaries with one element for each queried value.
Each dictionary contains the fields present in the underlying database tables (see ``UIDAnnotations`` and ``Images``).
"""
# we first fetch the parent images matching a series
parent_images = self.get_image_series(info, what=what_image)
if len(parent_images) == 0:
logging.warning('No image found for provided info!')
return [{}]
# we filter the metadata as only these two fields are necessary
# info = [{k: v for k, v in p.items() if k in {"device", 'datetime'}} for p in parent_images]
parent_images = pd.DataFrame(parent_images)
annots = self.get_uid_annotations_series(info, what=what_annotation)
if len(annots) == 0:
annots = pd.DataFrame([], columns=['parent_image_id'])
else:
annots = pd.DataFrame(annots)
out = pd.merge(parent_images, annots, how='left', left_on=['id'], right_on=['parent_image_id'], suffixes=('', '_annot'))
# NaN -> None
out = out.where(pd.notnull(out), None).sort_values(['device', 'datetime'])
out = out.to_dict(orient='records')
return out
[docs] def get_tiled_tuboid_series_itc_labels(self, info: InfoType, what: str = "metadata") -> MetadataType:
tiled_tuboids = pd.DataFrame(self.get_tiled_tuboid_series(info, what))
if len(tiled_tuboids) == 0:
logging.warning('No tuboids found for %s' % info)
return []
itc_labels = pd.DataFrame(self._get_itc_labels([{'tuboid_id': i} for i in tiled_tuboids.tuboid_id]))
if len(itc_labels) == 0:
logging.warning('No ITC labels found for %s' % info)
out = tiled_tuboids
else:
# force suffixes for ITC
itc_labels.columns = itc_labels.columns.map(lambda x: str(x) + '_itc')
tiled_tuboids.set_index(['id'], inplace=True)
itc_labels.set_index(['parent_tuboid_id_itc'], inplace=True)
out = pd.merge(tiled_tuboids, itc_labels,
how='left', left_index=True, right_index=True)
out = out.where(pd.notnull(out), None) #.sort_values(['device', 'datetime'])
out = out.to_dict(orient='records')
return out
[docs] def put_images(self, files: List[str]) -> MetadataType:
"""
Incrementally upload a list of client files
:param files: the paths to the client files
:return: the data of the uploaded files, as represented in by API
"""
# instead of dealing with images one by one, we send them by chunks
# first find which files need to be uploaded
to_upload = {}
chunk_size = self._put_chunk_size * self._n_threads
files = {f: md5(f) for f in files}
for i, group in enumerate(chunker(files, chunk_size)):
logging.info("Putting images - step 1/2 ... Computing statistics on files %i-%i / %i" % (i * chunk_size,
i * chunk_size + len(group),
len(files)))
to_upload.update(self._diff_images_to_upload(group))
self._cache.sync()
if len(to_upload) == 0:
logging.warning('No image to upload!')
out = []
# upload by chunks now
for i, group in enumerate(chunker(to_upload, self._put_chunk_size)):
logging.info("Putting images - step 2/2 ... Uploading files %i-%i / %i" % (i*self._put_chunk_size,
i * self._put_chunk_size + len(group),
len(to_upload)))
out += self._put_new_images(group)
logging.info("Putting images... Complete!")
return out
[docs] def put_tiled_tuboids(self, tuboid_directories: List[str], series_info: Dict[str, Any]):
def parse_tuboid_dir(directory):
dirname = os.path.basename(os.path.normpath(directory))
metadata_file = os.path.join(directory, 'metadata.txt')
tuboid_file = os.path.join(directory, 'tuboid.jpg')
context_file = os.path.join(directory, 'context.jpg')
assert len(dirname.split('.')) == 5 # 5 fields in this dir
assert os.path.isfile(metadata_file)
assert os.path.isfile(context_file)
assert os.path.isfile(tuboid_file)
assert all([k in series_info.keys() for k in ('algo_name', 'algo_version', 'start_datetime', 'end_datetime', 'device', 'n_images', )])
return {'tuboid_id': dirname,
'series_info': series_info,
'metadata': metadata_file,
'tuboid': tuboid_file,
'context': context_file}
out = []
for i, group in enumerate(chunker(tuboid_directories, self._put_chunk_size)):
to_upload = [parse_tuboid_dir(g) for g in group]
logging.info("Putting tuboids... Uploading files %i-%i / %i" % (i*self._put_chunk_size,
i * self._put_chunk_size + len(group),
len(tuboid_directories)))
out += self._put_tiled_tuboids(to_upload)
return out
[docs] def delete_cache(self):
self._cache.delete()
def _diff_images_to_upload(self, files):
"""
Handles the negotiation process during upload. First trying to get the images to be uploaded,
Those that already exists and have the same checksum can be skipped,
to only upload new images
:param files: A dict of file paths mappting md5s
:type files: Dict()
:return: A list representing the subset of files to be uploaded
:rtype: Dict()
"""
def local_img_stats(file: str, file_stats: float, skip_on_error: bool):
try:
i = ImageParser(file)
except Exception as e:
if skip_on_error:
logging.error(e)
return {(file, file_stats):
{'device': None,
'datetime': None,
'md5': None,
'url': None}}
else:
raise e
out_ = {(file, file_stats):
{'device': i['device'],
'datetime': i['datetime'],
'md5': i['md5'],
'url': file}}
return out_
# we can compute the stats in parallel
cached_results = []
to_compute = []
# we use a set, should be faster
local_img_stats_src = inspect.getsource(local_img_stats)
hashes = self._cache.get_hashes(local_img_stats_src)
for f, v in files.items():
key = f, os.path.getmtime(f)
if key in hashes:
res = {key: self._cache.get_cached(local_img_stats_src, key)}
cached_results.append(res)
else:
to_compute.append(key)
if self._n_threads > 1:
computed = Parallel(n_jobs=self._n_threads)(delayed(local_img_stats)(*tc, skip_on_error = self._skip_on_error) for tc in to_compute)
else:
computed = [local_img_stats(*tc, skip_on_error = self._skip_on_error) for tc in to_compute]
logging.info('Caching %i image stats (%i already pre-computed)' % (len(computed), len(cached_results)))
self._cache.add(local_img_stats_src, computed)
computed += cached_results
# we request these images from the database
info = [list(imd.values())[0] for imd in computed]
# this is when self._skip_on_error is true, a None url means we skip
if not info:
return []
info = [inf for inf in info if inf["url"] is not None]
matches = self.get_images(info, what='metadata')
# now we diff: we ignore images that exist on DB AND have the same md5
# we put images that do not exist on db
# we warn if md5s are different
info = pd.DataFrame(info)
matches = pd.DataFrame(matches, columns=info.columns)
joined = pd.merge(info, matches, how='left', on = ['device', 'datetime'], suffixes=('', '_match'))
joined['same_md5'] = joined.apply(lambda row: row.md5_match == row.md5, axis=1)
joined['already_on_db'] = joined.apply(lambda row: not pd.isna(row.md5_match), axis=1)
# images that are not yet on the db:
out = joined[joined.already_on_db == False].url.tolist()
out = {o: files[o] for o in out}
# fixme. warn/prompt which has a different md5 (and match md5 is not NA)
return out
[docs] def get_ml_bundle_dir(self, bundle_name: str, bundle_dir: str, what: str) -> List[Dict[str, Union[float, str]]]:
assert os.path.basename(os.path.normpath(bundle_dir)) == bundle_name
local_files = BaseStorage.local_bundle_files_info(bundle_dir, what)
already_downloaded_dict = {au['key']: au for au in local_files}
remote_files = self._get_ml_bundle_file_list(bundle_name, what)
files_to_download = []
for r in remote_files:
to_download = False
# file does not exists on remote
if r['key'] not in already_downloaded_dict:
to_download = True
else:
local_info = already_downloaded_dict[r['key']]
if r['md5'] == local_info['md5']:
to_download = False
elif r['mtime'] > local_info['mtime']:
to_download = True
if to_download:
files_to_download.append(r)
else:
logging.info("Skipping %s (already on local)" % str(r['key']))
def download_single_file(cls, f, bundle_dir):
cls._get_ml_bundle_file(f, bundle_dir)
os.utime(os.path.join(bundle_dir, f['key']), (time.time(), f['mtime']))
if self._n_threads > 1:
Parallel(n_jobs=self._n_threads)(delayed(download_single_file)(self.__class__, f, bundle_dir)
for f in files_to_download)
else:
for f in files_to_download:
download_single_file(self.__class__, f, bundle_dir)
return files_to_download
[docs] def put_ml_bundle_dir(self, bundle_name: str, bundle_dir: str, what: str = 'all') -> List[Dict[str, Union[float, str]]]:
# bundle_name = os.path.basename(os.path.normpath(bundle_dir))
files_to_upload = BaseStorage.local_bundle_files_info(bundle_dir, what)
for f in files_to_upload:
f['bundle_name'] = bundle_name
n_local_files =len(files_to_upload)
logging.info(f'Found {n_local_files} local files to upload')
files_to_upload = self._get_ml_bundle_upload_links(files_to_upload)
logging.info(f'{n_local_files - len(files_to_upload)} already uploaded)')
for f in files_to_upload:
if f['url'] is not None:
self._put_ml_bundle_file(f['path'], f['url'])
return files_to_upload
@abstractmethod
def _put_ml_bundle_file(self, path: str, url: str):
pass
@classmethod
@abstractmethod
def _get_ml_bundle_file(cls, file: Dict[str, str], bundle_dir: str):
pass
@abstractmethod
def _put_new_images(self, files: Dict[str, str], client_info: Dict[str, Any] = None) -> MetadataType:
pass
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__'])
class LocalClient(LocalAPI, BaseClient):
def __init__(self, local_dir: str, n_threads: int = 8, *args, **kwargs):
# ad hoc API config for the local API. define the local_dir variable
api_conf = LocalAPIConf(LOCAL_DIR=local_dir)
BaseClient.__init__(self, local_dir=local_dir, n_threads=n_threads)
# use this config for the local/internal mock API
LocalAPI.__init__(self, api_conf, *args, **kwargs)
def _put_ml_bundle_file(self, path: str, url: str):
if not os.path.isdir(os.path.dirname(url)):
os.makedirs(os.path.dirname(url), exist_ok=True)
logging.info("%s => %s" % (os.path.basename(path), url))
shutil.copy(path, url)
@classmethod
def _get_ml_bundle_file(cls, file_dict, bundle_dir):
url = file_dict['url']
target = os.path.join(bundle_dir, file_dict['key'])
if not os.path.isdir(os.path.dirname(target)):
os.makedirs(os.path.dirname(target), exist_ok=True)
logging.info("%s => %s" % (url, target))
shutil.copy(url, target)
[docs]class RemoteAPIException(Exception):
pass
# all the client methods may take python argument, the argument are implicitly transformed
# to json-compatible values using this decorator
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__', '_default_client_to_api'])
class RemoteAPIConnector(BaseAPISpec):
_max_retry_attempts = 5
_sleep_time_between_attempts = 1
def __init__(self, host: str, username: str, password: str, protocol: str = 'https', port: int = 443):
self._host = host
self._username = username
self._password = password
self._protocol = protocol
self._port = int(port)
self._token = {'token': None, 'expiration': 0}
def _default_client_to_api(self, entry_point, info=None, what: str = None, files=None, data=None, headers=None, attempt=0):
if entry_point != 'get_token':
if self._token['expiration'] < int(time.time()) + 60: # we add 60s just to be sure
self._token = self.get_token()
auth = self._token['token'], ''
if info is None:
info = [{}]
else:
auth = self._username, self._password
url = "%s://%s:%i/%s" % (self._protocol, self._host, self._port, entry_point)
if what is not None:
url += "/" + what
logging.debug('Requesting %s' % url)
# response = requests.post(url, json=info, files=files, auth=auth, data=data, headers=headers)
# #
# response = requests.post(url, json=info, files=files, auth=auth, data=data)
response = requests.post(url, json=info, files=files, auth=auth, data=data, headers=headers)
if response.status_code == 200:
return response.json(object_hook=json_out_parser)
else:
if attempt >= self._max_retry_attempts:
logging.error("Failed to request url: %s" % url)
raise RemoteAPIException(response.content)
else:
time.sleep(self._sleep_time_between_attempts * self._max_retry_attempts)
attempt += 1
if files is not None:
for k in files.keys():
try:
files[k].seek(0)
except AttributeError:
try:
files[k][1].seek(0)
except AttributeError:
pass
logging.warning("Failed to request url: %s. Retrying... Attempt %i" % (url, attempt + 1))
# response.close()
# requests.session().close()
return self._default_client_to_api(entry_point, info, what, files, data, headers = headers, attempt=attempt)
[docs] def get_token(self, client_info: Dict[str, Any] = None) -> str:
return self._default_client_to_api('get_token', info=None)
def _put_new_images(self, files: Dict[str, str], client_info: Dict[str, Any] = None) -> MetadataType:
out = []
for file, md5_sum in files.items():
for i in range(self._max_retry_attempts):
with open(file, 'rb') as f:
mp_encoder = MultipartEncoder(
fields={
os.path.basename(file): (os.path.basename(file), f, 'application/jpeg'),
os.path.basename(file) + ".md5": md5_sum
})
header = {'Content-Type': mp_encoder.content_type, 'Connection': 'close'}# fixme not needed close
try:
out += self._default_client_to_api('_put_new_images', files=None, data=mp_encoder,
headers=header, attempt=self._max_retry_attempts)
break
except RemoteAPIException:
time.sleep(i)
if i == self._max_retry_attempts -1:
raise RemoteAPIException("Max attempts tried")
return out
# custom handling of file objects to upload
[docs] def get_users(self, info: List[Dict[str, str]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('get_users', info)
[docs] def put_users(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('put_users', info)
[docs] def delete_users(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('delete_users', info)
[docs] def get_images(self, info: InfoType, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('get_images', info, what=what)
[docs] def get_image_series(self, info, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('get_image_series', info, what=what)
[docs] def get_images_to_annotate(self, info, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('get_images_to_annotate', info, what=what)
[docs] def get_project_permissions(self, info: List[Dict[str, Any]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('get_project_permissions', info)
[docs] def get_project_series(self, info: List[Dict[str, Any]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('get_project_series', info)
[docs] def get_projects(self, info: List[Dict[str, Any]] = None, client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('get_projects', info)
[docs] def delete_projects(self, info: List[Dict[str, str]] , client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('delete_projects', info)
[docs] def put_project_permissions(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('put_project_permissions', info)
[docs] def put_project_series(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('put_project_series', info)
[docs] def put_projects(self, info: List[Dict[str, Any]], client_info: Dict[str, Any] = None) -> List[Dict[str, Any]]:
return self._default_client_to_api('put_projects', info)
[docs] def delete_images(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('delete_images', info)
[docs] def delete_tiled_tuboids(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('delete_tiled_tuboids', info)
[docs] def put_uid_annotations(self, info: AnnotType, client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('put_uid_annotations', info)
[docs] def get_uid_annotations(self, info: InfoType, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('get_uid_annotations', info, what=what)
[docs] def get_uid_annotations_series(self, info: InfoType, what: str = 'metadata', client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('get_uid_annotations_series', info, what=what)
[docs] def delete_project_columns(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('delete_project_columns', info)
[docs] def delete_project_series(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('delete_project_series', info)
[docs] def put_project_columns(self, info: InfoType, client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('put_project_columns', info)
[docs] def get_tiled_tuboid_series(self, info: InfoType, what: str = "metadata", client_info: Dict[str, Any] = None) \
-> MetadataType:
return self._default_client_to_api('get_tiled_tuboid_series', info=info, what=what)
def _put_tiled_tuboids(self, files: List[Dict[str, str]], client_info: Dict[str, Any] = None) -> MetadataType:
out = []
for dic in files:
with open(dic['metadata'], 'r') as m, open(dic['tuboid'], 'rb') as t, open(dic['context'], 'rb') as c:
payload = {'metadata': ('metadata.txt', m, 'application/text'),
'tuboid': ('tuboid.jpg', t, 'application/octet-stream'),
'context': ('context.jpg', c, 'application/octet-stream'),
'tuboid_id': ('tuboid_id', json.dumps(dic['tuboid_id']), 'application/json'),
'series_info': ('series_info', json.dumps(dic['series_info']), 'application/json'),
}
out += self._default_client_to_api('_put_tiled_tuboids', files=payload, info=None)
return out
def _get_itc_labels(self, info: List[Dict], client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('_get_itc_labels', info)
[docs] def put_itc_labels(self, info: List[Dict[str, Union[str, int]]], client_info: Dict[str, Any] = None) -> MetadataType:
return self._default_client_to_api('put_itc_labels', info)
def _get_ml_bundle_file_list(self, info: str, what: str = "all", client_info: Dict[str, Any] = None) -> List[Dict[str, Union[float, str]]]:
return self._default_client_to_api('_get_ml_bundle_file_list', info, what=what)
def _get_ml_bundle_upload_links(self, info: List[Dict[str, Union[float, str]]], client_info: Dict[str, Any] = None) -> \
List[Dict[str, Union[float, str]]]:
return self._default_client_to_api('_get_ml_bundle_upload_links', info)
[docs]@decorate_all_methods(python_inputs_to_json, exclude=['__init__'])
class RemoteClient(RemoteAPIConnector, BaseClient):
def __init__(self, local_dir: str, host, username, password, protocol: str = 'https', port: int = 443, n_threads: int = 8, skip_on_error=False):
BaseClient.__init__(self, local_dir=local_dir, n_threads=n_threads, skip_on_error=skip_on_error)
RemoteAPIConnector.__init__(self, host, username, password, protocol, port)
def _put_ml_bundle_file(self, path: str, url: Union[str, Dict]):
#fixme, name this is actually not a url here, but a json str => dict
response = url
object_name = os.path.basename(path)
logging.info("%s => %s" % (os.path.basename(path), response['fields']['key']))
with open(path, 'rb') as f:
files = {'file': (object_name, f)}
http_response = requests.post(response['url'], data=response['fields'], files=files)
assert http_response.status_code == 204, response
@classmethod
def _get_ml_bundle_file(cls, file_dict, bundle_dir):
url = file_dict['url']
target = os.path.join(bundle_dir, file_dict['key'])
dirname = os.path.dirname(target)
if not os.path.isdir(dirname):
os.makedirs(dirname, exist_ok=True)
target_tmp = target + ".tmp"
with open(target_tmp, 'wb') as data:
logging.info("%s => %s" % (url, target))
r = requests.get(url)
data.write(r.content)
assert os.path.isfile(target_tmp), f'{file_dict["key"]}: file not writen!'
comp_md5 = multipart_etag(target_tmp, BaseStorage._multipart_chunk_size)
assert comp_md5 == file_dict['md5'], f'{file_dict["key"]}: md5s differ {comp_md5} != {file_dict["md5"]}!'
os.rename(target_tmp, target)