Source code for sticky_pi_api.utils

import logging
import requests
import tempfile
import shutil
import pandas as pd
import os
import hashlib
import json
from decimal import Decimal
import re
import datetime
import functools
import cProfile
import io
import pstats
import contextlib


STRING_DATETIME_FILENAME_FORMAT = '%Y-%m-%d_%H-%M-%S'
STRING_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
DATESTRING_REGEX = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$")


#fixme legacy compatibility so we don't break the data harvester, That uses the old convention
LEGACY_STRING_DATETIME_FORMAT = '%Y-%m-%d_%H-%M-%S'
LEGACY_DATESTRING_REGEX = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}$")


[docs]@contextlib.contextmanager def profiled(): pr = cProfile.Profile() pr.enable() yield pr.disable() s = io.StringIO() ps = pstats.Stats(pr, stream=s).sort_stats('cumulative') ps.print_stats(100) # uncomment this to see who's calling what # ps.print_callers() print(s.getvalue())
[docs]class URLOrFileOpen(object): def __init__(self, file_or_url, mode): self._file_or_url = file_or_url self.mode = mode self._tmp_dir_path = None self._file = None def __enter__(self): if not os.path.isfile(self._file_or_url): resp = requests.get(self._file_or_url, allow_redirects=True) self._tmp_dir_path = tempfile.mkdtemp(prefix='sticky-pi-') file_name = os.path.basename(self._file_or_url).split('?')[0] self._file_or_url = os.path.join(self._tmp_dir_path, file_name) with open(self._file_or_url, 'wb') as f: f.write(resp.content) self._file = open(self._file_or_url, self.mode) return self._file def __exit__(self, exc_type, exc_value, exc_traceback): if self._file: self._file.close() if self._tmp_dir_path: shutil.rmtree(self._tmp_dir_path)
[docs]def chunker(seq, size: int): """ Breaks an iterable into a list of smaller chunks of size ``size`` (or less for the last chunk) :param seq: an iterable :param size: the size of the chunk :return: """ if isinstance(seq, dict): keys = [k for k in seq.keys()] key_sets = (keys[pos:pos + size] for pos in range(0, len(seq), size)) out = [] for ks in key_sets: out.append({k:seq[k] for k in ks }) return out else: return (seq[pos:pos + size] for pos in range(0, len(seq), size))
[docs]def json_io_converter(o): if isinstance(o, datetime.datetime): if o: return datetime_to_string(o) else: return None elif isinstance(o, Decimal): return float(o) elif hasattr(o, 'read'): return o else: raise Exception('Un-parsable json object: %s' % o)
[docs]def json_out_parser(o): for k, v in o.items(): if isinstance(v, str): if DATESTRING_REGEX.search(v): o[k] = string_to_datetime(o[k]) #fixme this could be removed eventually elif LEGACY_DATESTRING_REGEX.search(v): o[k] = string_to_datetime(o[k], format=LEGACY_STRING_DATETIME_FORMAT) return o
[docs]def json_inputs_to_python(func): @functools.wraps(func) def _json_inputs_to_python(self, *args, **kwargs): formated_a = [] for a in args: json_a = json.dumps(a, default=json_io_converter) a = json.loads(json_a, object_hook=json_out_parser) formated_a.append(a) formated_k = {} for k, v in kwargs.items(): json_v = json.dumps(v, default=json_io_converter) v = json.loads(json_v, object_hook=json_out_parser) formated_k[k] = v out = func(self, *formated_a, **formated_k) # it it the responsibility of the serializer to then encode to json, on the remote api return out return _json_inputs_to_python
[docs]def python_inputs_to_json(func): @functools.wraps(func) def _python_inputs_to_json(self, *args, **kwargs): formated_a = [] for a in args: json_a = json.dumps(a, default=json_io_converter) a = json.loads(json_a) formated_a.append(a) formated_k = {} for k, v in kwargs.items(): json_v = json.dumps(v, default=json_io_converter) v = json.loads(json_v) formated_k[k] = v out = func(self, *formated_a, **formated_k) # it it the responsibility of the serializer to then decode to json, on the remote client return out return _python_inputs_to_json
[docs]def md5(file, chunk_size=32768): # if the file is a path, open and recurse if type(file) == str: with open(file, 'rb') as f: return md5(f) try: hash_md5 = hashlib.md5() for chunk in iter(lambda: file.read(chunk_size), b""): hash_md5.update(chunk) finally: file.seek(0) return hash_md5.hexdigest()
[docs]def multipart_etag(file, chunk_size): if type(file) == str: with open(file, 'rb') as f: return multipart_etag(f, chunk_size) file.seek(0) md5s = [] while True: data = file.read(chunk_size) if not data: break md5s.append(hashlib.md5(data)) if len(md5s) > 1: digests = b"".join(m.digest() for m in md5s) new_md5 = hashlib.md5(digests) new_etag = '%s-%s' % (new_md5.hexdigest(), len(md5s)) elif len(md5s) == 1: # file smaller than chunk size new_etag = '%s' % md5s[0].hexdigest() else: # empty file new_etag = '' file.seek(0) return new_etag
[docs]def string_to_datetime(string, is_filename=False, format = None): if format is not None: return datetime.datetime.strptime(string, format) if is_filename: return datetime.datetime.strptime(string, STRING_DATETIME_FILENAME_FORMAT) else: return datetime.datetime.strptime(string, STRING_DATETIME_FORMAT)
[docs]def datetime_to_string(dt): if pd.isnull(dt): return None return datetime.datetime.strftime(dt, STRING_DATETIME_FORMAT)