import logging
import requests
import tempfile
import shutil
import pandas as pd
import os
import hashlib
import json
from decimal import Decimal
import re
import datetime
import functools
import cProfile
import io
import pstats
import contextlib
STRING_DATETIME_FILENAME_FORMAT = '%Y-%m-%d_%H-%M-%S'
STRING_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
DATESTRING_REGEX = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$")
#fixme legacy compatibility so we don't break the data harvester, That uses the old convention
LEGACY_STRING_DATETIME_FORMAT = '%Y-%m-%d_%H-%M-%S'
LEGACY_DATESTRING_REGEX = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}$")
[docs]@contextlib.contextmanager
def profiled():
pr = cProfile.Profile()
pr.enable()
yield
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(100)
# uncomment this to see who's calling what
# ps.print_callers()
print(s.getvalue())
[docs]class URLOrFileOpen(object):
def __init__(self, file_or_url, mode):
self._file_or_url = file_or_url
self.mode = mode
self._tmp_dir_path = None
self._file = None
def __enter__(self):
if not os.path.isfile(self._file_or_url):
resp = requests.get(self._file_or_url, allow_redirects=True)
self._tmp_dir_path = tempfile.mkdtemp(prefix='sticky-pi-')
file_name = os.path.basename(self._file_or_url).split('?')[0]
self._file_or_url = os.path.join(self._tmp_dir_path, file_name)
with open(self._file_or_url, 'wb') as f:
f.write(resp.content)
self._file = open(self._file_or_url, self.mode)
return self._file
def __exit__(self, exc_type, exc_value, exc_traceback):
if self._file:
self._file.close()
if self._tmp_dir_path:
shutil.rmtree(self._tmp_dir_path)
[docs]def chunker(seq, size: int):
"""
Breaks an iterable into a list of smaller chunks of size ``size`` (or less for the last chunk)
:param seq: an iterable
:param size: the size of the chunk
:return:
"""
if isinstance(seq, dict):
keys = [k for k in seq.keys()]
key_sets = (keys[pos:pos + size] for pos in range(0, len(seq), size))
out = []
for ks in key_sets:
out.append({k:seq[k] for k in ks })
return out
else:
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
[docs]def json_io_converter(o):
if isinstance(o, datetime.datetime):
if o:
return datetime_to_string(o)
else:
return None
elif isinstance(o, Decimal):
return float(o)
elif hasattr(o, 'read'):
return o
else:
raise Exception('Un-parsable json object: %s' % o)
[docs]def json_out_parser(o):
for k, v in o.items():
if isinstance(v, str):
if DATESTRING_REGEX.search(v):
o[k] = string_to_datetime(o[k])
#fixme this could be removed eventually
elif LEGACY_DATESTRING_REGEX.search(v):
o[k] = string_to_datetime(o[k], format=LEGACY_STRING_DATETIME_FORMAT)
return o
[docs]def md5(file, chunk_size=32768):
# if the file is a path, open and recurse
if type(file) == str:
with open(file, 'rb') as f:
return md5(f)
try:
hash_md5 = hashlib.md5()
for chunk in iter(lambda: file.read(chunk_size), b""):
hash_md5.update(chunk)
finally:
file.seek(0)
return hash_md5.hexdigest()
[docs]def multipart_etag(file, chunk_size):
if type(file) == str:
with open(file, 'rb') as f:
return multipart_etag(f, chunk_size)
file.seek(0)
md5s = []
while True:
data = file.read(chunk_size)
if not data:
break
md5s.append(hashlib.md5(data))
if len(md5s) > 1:
digests = b"".join(m.digest() for m in md5s)
new_md5 = hashlib.md5(digests)
new_etag = '%s-%s' % (new_md5.hexdigest(), len(md5s))
elif len(md5s) == 1: # file smaller than chunk size
new_etag = '%s' % md5s[0].hexdigest()
else: # empty file
new_etag = ''
file.seek(0)
return new_etag
[docs]def string_to_datetime(string, is_filename=False, format = None):
if format is not None:
return datetime.datetime.strptime(string, format)
if is_filename:
return datetime.datetime.strptime(string, STRING_DATETIME_FILENAME_FORMAT)
else:
return datetime.datetime.strptime(string, STRING_DATETIME_FORMAT)
[docs]def datetime_to_string(dt):
if pd.isnull(dt):
return None
return datetime.datetime.strftime(dt, STRING_DATETIME_FORMAT)