import datetime import faulthandler import functools import gc import inspect import json import math import os import random import re import sqlite3 import sys import threading import time from bisect import bisect_left from enum import Enum from itertools import chain, combinations from math import log, isnan, nan, floor, log10, gcd from numbers import Number from shutil import copyfile from threading import RLock from types import FunctionType from typing import Union, Tuple, List, Optional, Dict, Any, Type # noinspection PyUnresolvedReferences from unittest import TestCase, mock import cachetools import hanging_threads import matplotlib.cm import matplotlib.pyplot as plt import numpy import numpy as np import pandas import scipy.optimize import scipy.stats import tabulate from scipy.ndimage import zoom from lib import stack_tracer, print_exc_plus from lib.my_logger import logging X = Y = Z = float class KnownIssue(Exception): """ This means the code is not working and should not be used but still too valuable to be deleted """ pass def powerset(iterable): """powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)""" s = list(iterable) return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1)) def plot_with_conf(x, y_mean, y_conf, alpha=0.5, **kwargs): ax = kwargs.pop('ax', plt.gca()) base_line, = ax.plot(x, y_mean, **kwargs) y_mean = np.array(y_mean) y_conf = np.array(y_conf) lb = y_mean - y_conf ub = y_mean + y_conf ax.fill_between(x, lb, ub, facecolor=base_line.get_color(), alpha=alpha) def choice(sequence, probabilities): # if sum(probabilities) != 1: # raise AssertionError('Probabilities must sum to 1') r = random.random() for idx, c in enumerate(sequence): r -= probabilities[idx] if r < 0: return c raise AssertionError('Probabilities must sum to 1') def print_attributes(obj, include_methods=False, ignore=None): if ignore is None: ignore = [] for attr in dir(obj): if attr in ignore: continue if attr.startswith('_'): continue if not include_methods and callable(obj.__getattr__(attr)): continue print(attr, ':', obj.__getattr__(attr).__class__.__name__, ':', obj.__getattr__(attr)) def attr_dir(obj, include_methods=False, ignore=None): if ignore is None: ignore = [] return {attr: obj.__getattr__(attr) for attr in dir(obj) if not attr.startswith('_') and ( include_methods or not callable(obj.__getattr__(attr))) and attr not in ignore} def zoom_to_shape(a: np.ndarray, shape: Tuple, mode: str = 'smooth', verbose=1): from keras import backend a = np.array(a, dtype=backend.floatx()) # also does a copy shape_dim = len(a.shape) if len(a.shape) != len(shape): raise ValueError('The shapes must have the same dimension but were len({0}) = {1} (original) ' 'and len({2}) = {3} desired.'.format(a.shape, len(a.shape), shape, len(shape))) if len(shape) == 0: return a zoom_factors = tuple(shape[idx] / a.shape[idx] for idx in range(shape_dim)) def _current_index_in_old_array(): return tuple(slice(0, length) if axis != current_axis else slice(current_pixel_index, current_pixel_index + 1) for axis, length in enumerate(a.shape)) def _current_pixel_shape(): return tuple(length if axis != current_axis else 1 for axis, length in enumerate(a.shape)) def _current_result_index(): return tuple( slice(0, length) if axis != current_axis else slice(pixel_index_in_result, pixel_index_in_result + 1) for axis, length in enumerate(a.shape)) def _current_result_shape(): return tuple(orig_length if axis != current_axis else shape[axis] for axis, orig_length in enumerate(a.shape)) if mode == 'constant': result = zoom(a, zoom_factors) assert result.shape == shape return result elif mode == 'smooth': result = a for current_axis, zoom_factor in sorted(enumerate(zoom_factors), key=lambda x: x[1]): result = np.zeros(_current_result_shape(), dtype=backend.floatx()) # current_length = a.shape[current_axis] desired_length = shape[current_axis] current_pixel_index = 0 current_pixel_part = 0 # how much of the current pixel is already read for pixel_index_in_result in range(desired_length): pixels_remaining = 1 / zoom_factor pixel_sum = np.zeros(_current_pixel_shape()) while pixels_remaining + current_pixel_part > 1: pixel_sum += (1 - current_pixel_part) * a[_current_index_in_old_array()] current_pixel_index += 1 pixels_remaining -= (1 - current_pixel_part) current_pixel_part = 0 # the remaining pixel_part try: pixel_sum += pixels_remaining * a[_current_index_in_old_array()] except (IndexError, ValueError): if verbose: print('WARNING: Skipping {0} pixels because of numerical imprecision.'.format(pixels_remaining)) else: current_pixel_part += pixels_remaining # insert to result pixel_sum *= zoom_factor result[_current_result_index()] = pixel_sum a = result assert result.shape == shape return result else: return NotImplementedError('Mode not available.') def profile_wall_time_instead_if_profiling(): try: import yappi except ModuleNotFoundError: return currently_profiling = len(yappi.get_func_stats()) if currently_profiling and yappi.get_clock_type() != 'wall': yappi.stop() print('Profiling wall time instead of cpu time.') yappi.clear_stats() yappi.set_clock_type("wall") yappi.start() def dummy_computation(*_args, **_kwargs): pass def backup_file(filename): copyfile(filename, backup_file_path(filename)) def backup_file_path(filename): return filename + time.strftime("%Y%m%d") + '.bak' # noinspection SpellCheckingInspection def my_tabulate(data, tablefmt='pipe', **params): if data == [] and 'headers' in params: data = [(None for _ in params['headers'])] tabulate.MIN_PADDING = 0 return tabulate.tabulate(data, tablefmt=tablefmt, **params) def ce_loss(y_true, y_predicted): return -(y_true * log(y_predicted) + (1 - y_true) * log(1 - y_predicted)) class DontSaveResultsError(Exception): pass def multinomial(n, bins): if bins == 0: if n > 0: raise ValueError('Cannot distribute to 0 bins.') return [] remaining = n results = [] for i in range(bins - 1): from numpy.random.mtrand import binomial x = binomial(remaining, 1 / (bins - i)) results.append(x) remaining -= x results.append(remaining) return results class UnknownTypeError(Exception): pass # def shape_analysis(xs): # composed_dtypes = [list, tuple, np.ndarray, dict, set] # base_dtypes = [str, int, float, type, object] # TODO add class and superclass of xs first element # all_dtypes = composed_dtypes + base_dtypes # if isinstance(xs, np.ndarray): # outer_brackets = ('[', ']') # shape = xs.shape # dtype = xs.dtype # elif isinstance(xs, tuple): # outer_brackets = ('(', ')') # shape = len(xs) # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0] # elif isinstance(xs, list): # outer_brackets = ('[', ']') # shape = len(xs) # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0] # elif isinstance(xs, dict) or isinstance(xs, set): # outer_brackets = ('{', '}') # shape = len(xs) # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0] # elif any(isinstance(xs, t) for t in base_dtypes): # for t in base_dtypes: # if isinstance(xs, t): # return str(t.__name__) # raise AssertionError('This should be unreachable.') # else: # raise UnknownTypeError('Unknown type:' + type(xs).__name__) # # if shape and shape != '?': # return outer_brackets[0] + str(xs.shape) + ' * ' + str(dtype) + outer_brackets[1] # else: # return outer_brackets[0] + outer_brackets[1] def beta_conf_interval_mle(data, conf=0.95): if len(data) <= 1: return 0, 1 # overestimates the interval if any(d < 0 or d > 1 or isnan(d) for d in data): return nan, nan if numpy.var(data) == 0: return numpy.mean(data), numpy.mean(data) epsilon = 1e-3 # adjusted_data = data.copy() # for idx in range(len(adjusted_data)): # adjusted_data[idx] *= (1 - 2 * epsilon) # adjusted_data[idx] += epsilon alpha, beta, _, _ = scipy.stats.beta.fit(data, floc=-epsilon, fscale=1 + 2 * epsilon) lower, upper = scipy.stats.beta.interval(alpha=conf, a=alpha, b=beta) if lower < 0: lower = 0 if upper < 0: upper = 0 if lower > 1: lower = 1 if upper > 1: upper = 1 return lower, upper def gamma_conf_interval_mle(data, conf=0.95) -> Tuple[float, float]: if len(data) == 0: return nan, nan if len(data) == 1: return nan, nan if any(d < 0 or isnan(d) for d in data): return nan, nan if numpy.var(data) == 0: return numpy.mean(data).item(), 0 alpha, _, scale = scipy.stats.gamma.fit(data, floc=0) lower, upper = scipy.stats.gamma.interval(alpha=conf, a=alpha, scale=scale) if lower < 0: lower = 0 if upper < 0: upper = 0 return lower, upper beta_quantile_cache = cachetools.LRUCache(maxsize=10) @cachetools.cached(cache=beta_quantile_cache, key=lambda x1, p1, x2, p2, guess: (x1, x2, p1, p2)) def beta_parameters_quantiles(x1, p1, x2, p2, guess=(3, 3)): "Find parameters for a beta random variable X; so; that; P(X > x1) = p1 and P(X > x2) = p2.; " def square(x): return x * x def objective(v): (a, b) = v temp = square(scipy.stats.beta.cdf(x1, a, b) - p1) temp += square(scipy.stats.beta.cdf(x2, a, b) - p2) return temp xopt = scipy.optimize.fmin(objective, guess, disp=False) return (xopt[0], xopt[1]) def beta_conf_interval_quantile(data, conf=0.95, quantiles=(0.25, 0.75)): if len(data) <= 1: return 0, 1 # overestimates the interval mu = numpy.mean(data) v = numpy.var(data) data = numpy.array(data) if v == 0: return mu, mu lower = numpy.quantile(data, quantiles[0]) upper = numpy.quantile(data, quantiles[1]) alpha_guess = mu ** 2 * ((1 - mu) / v - 1 / mu) beta_guess = alpha_guess * (1 / mu - 1) alpha, beta = beta_parameters_quantiles(lower, quantiles[0], upper, quantiles[1], (alpha_guess, beta_guess)) return scipy.stats.beta.interval(alpha=conf, a=alpha, b=beta) def beta_stats_quantile(data, quantiles=(0.25, 0.75)): if len(data) <= 1: return 0, 1 # overestimates the interval data = numpy.array(data) mu = numpy.mean(data) v = numpy.var(data) if v == 0: return mu, mu lower = numpy.quantile(data, quantiles[0]) upper = numpy.quantile(data, quantiles[1]) alpha_guess = mu ** 2 * ((1 - mu) / v - 1 / mu) beta_guess = alpha_guess * (1 / mu - 1) alpha, beta = beta_parameters_quantiles(lower, quantiles[0], upper, quantiles[1], (alpha_guess, beta_guess)) return scipy.stats.beta.stats(a=alpha, b=beta) def beta_stats_mle(data): if len(data) == 0: return nan, nan if len(data) == 1: return nan, nan if any(d < 0 or d > 1 or isnan(d) for d in data): return nan, nan if numpy.var(data) == 0: return numpy.mean(data), 0 epsilon = 1e-4 # adjusted_data = data.copy() # for idx in range(len(adjusted_data)): # adjusted_data[idx] *= (1 - 2 * epsilon) # adjusted_data[idx] += epsilon alpha, beta, _, _ = scipy.stats.beta.fit(data, floc=-epsilon, fscale=1 + 2 * epsilon) return scipy.stats.beta.stats(a=alpha, b=beta) def gamma_stats_mle(data): if len(data) == 0: return nan, nan if len(data) == 1: return nan, nan if any(d < 0 or isnan(d) for d in data): return nan, nan if numpy.var(data) == 0: return numpy.mean(data), 0 alpha, _, scale = scipy.stats.gamma.fit(data, floc=0) return scipy.stats.gamma.stats(a=alpha, scale=scale) beta_stats = beta_stats_quantile beta_conf_interval = beta_conf_interval_quantile gamma_stats = gamma_stats_mle gamma_conf_interval = gamma_conf_interval_mle def split_df_list(df, target_column): """ df = data frame to split, target_column = the column containing the values to split separator = the symbol used to perform the split returns: a data frame with each entry for the target column separated, with each element moved into a new row. The values in the other columns are duplicated across the newly divided rows. SOURCE: https://gist.github.com/jlln/338b4b0b55bd6984f883 """ def split_list_to_rows(row, row_accumulator): split_row = json.loads(row[target_column]) for s in split_row: new_row = row.to_dict() new_row[target_column] = s row_accumulator.append(new_row) new_rows = [] df.apply(split_list_to_rows, axis=1, args=(new_rows,)) new_df = pandas.DataFrame(new_rows) return new_df try: import winsound as win_sound def beep(*args, **kwargs): win_sound.Beep(*args, **kwargs) except ImportError: win_sound = None def beep(*_args, **_kwargs): pass def round_to_digits(x, d): if x == 0: return 0 if isnan(x): return nan try: return round(x, d - 1 - int(floor(log10(abs(x))))) except OverflowError: return x def gc_if_memory_error(f, *args, **kwargs): try: return f(*args, **kwargs) except MemoryError: print('Starting garbage collector') gc.collect() return f(*args, **kwargs) def assert_not_empty(x): assert len(x) return x def validation_steps(validation_dataset_size, maximum_batch_size): batch_size = gcd(validation_dataset_size, maximum_batch_size) steps = validation_dataset_size // batch_size assert batch_size * steps == validation_dataset_size return batch_size, steps def functional_dependency_trigger(connection: sqlite3.Connection, table_name: str, determining_columns: List[str], determined_columns: List[str], exist_ok: bool, ): cursor = connection.cursor() # possible_performance_improvements determined_columns = [c for c in determined_columns if c not in determining_columns] trigger_base_name = '_'.join([table_name] + determining_columns + ['determine'] + determined_columns) error_message = ','.join(determining_columns) + ' must uniquely identify ' + ','.join(determined_columns) # when inserting check if there is already an entry with these values cursor.execute(f''' CREATE TRIGGER {'IF NOT EXISTS' if exist_ok else ''} {trigger_base_name}_after_insert BEFORE INSERT ON {table_name} WHEN EXISTS(SELECT * FROM {table_name} WHERE ({' AND '.join(f'NEW.{c} IS NOT NULL AND {c} = NEW.{c}' for c in determining_columns)}) AND ({' OR '.join(f'{c} != NEW.{c}' for c in determined_columns)})) BEGIN SELECT RAISE(ROLLBACK, '{error_message}'); END ''') # when updating check if there is already an entry with these values (only if changed) cursor.execute(f''' CREATE TRIGGER {'IF NOT EXISTS' if exist_ok else ''} {trigger_base_name}_after_update BEFORE UPDATE ON {table_name} WHEN EXISTS(SELECT * FROM {table_name} WHERE ({' AND '.join(f'NEW.{c} IS NOT NULL AND {c} = NEW.{c}' for c in determining_columns)}) AND ({' OR '.join(f'{c} != NEW.{c}' for c in determined_columns)})) BEGIN SELECT RAISE(ROLLBACK, '{error_message}'); END ''') def heatmap_from_points(x, y, x_lim: Optional[Union[int, Tuple[int, int]]] = None, y_lim: Optional[Union[int, Tuple[int, int]]] = None, gridsize=30): if isinstance(x_lim, Number): x_lim = (x_lim, x_lim) if isinstance(y_lim, Number): y_lim = (y_lim, y_lim) plt.hexbin(x, y, gridsize=gridsize, cmap=matplotlib.cm.jet, bins=None) if x_lim is not None: plt.xlim(x_lim) if y_lim is not None: plt.ylim(y_lim) cb = plt.colorbar() cb.set_label('mean value') def strptime(date_string, fmt): return datetime.datetime(*(time.strptime(date_string, fmt)[0:6])) class PrintLineRLock(RLock().__class__): def __init__(self, *args, name='', **kwargs): # noinspection PyArgumentList super().__init__(*args, **kwargs) self.name = name def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: print(f'Trying to acquire Lock {self.name}') result = RLock.acquire(self, blocking, timeout) print(f'Acquired Lock {self.name}') return result def release(self) -> None: print(f'Trying to release Lock {self.name}') # noinspection PyNoneFunctionAssignment result = RLock.release(self) print(f'Released Lock {self.name}') return result def __enter__(self, *args, **kwargs): print('Trying to enter Lock') # noinspection PyArgumentList super().__enter__(*args, **kwargs) print('Entered Lock') def __exit__(self, *args, **kwargs): print('Trying to exit Lock') super().__exit__(*args, **kwargs) print('Exited Lock') def fixed_get_current_frames(): """Return current threads prepared for further processing. """ threads = {thread.ident: thread for thread in threading.enumerate()} return { thread_id: { 'frame': hanging_threads.thread2list(frame), 'time': None, 'id': thread_id, 'name': threads[thread_id].name, 'object': threads[thread_id] } for thread_id, frame in sys._current_frames().items() if thread_id in threads # otherwise keyerrors might happen because of race conditions } hanging_threads.get_current_frames = fixed_get_current_frames class CallCounter(): def __init__(self, f): self.f = f self.calls = 0 self.__name__ = f.__name__ def __call__(self, *args, **kwargs): self.calls += 1 return self.f(*args, **kwargs) def __str__(self): return str(self.__dict__) def __repr__(self): return self.__class__.__name__ + repr(self.__dict__) def test_with_timeout(timeout=2): def wrapper(f): from lib.threading_timer_decorator import exit_after f = exit_after(timeout)(f) @functools.wraps(f) def wrapped(*args, **kwargs): try: print(f'Running this test with timeout: {timeout}') return f(*args, **kwargs) except KeyboardInterrupt: raise AssertionError(f'Test took longer than {timeout} seconds') return wrapped return wrapper def lru_cache_by_id(maxsize): return cachetools.cached(cachetools.LRUCache(maxsize=maxsize), key=id) class EquivalenceRelation: def equivalent(self, a, b) -> bool: raise NotImplementedError('Abstract method') def equivalence_classes(self, xs: list): classes = [] for x in xs: for c in classes: if self.equivalent(x, c[0]): c.append(x) break else: classes.append([x]) return classes def check_reflexivity_on_dataset(self, xs): for x in xs: if not self.equivalent(x, x): return False return True def check_symmetry_on_dataset(self, xs): for x in xs: for y in xs: if x is y: continue if self.equivalent(x, y) and not self.equivalent(y, x): return False return True def check_axioms_on_dataset(self, xs): return ( self.check_reflexivity_on_dataset(xs) and self.check_symmetry_on_dataset(xs) and self.check_transitivity_on_dataset(xs, assume_symmetry=True, assume_reflexivity=True) ) def check_transitivity_on_dataset(self, xs, assume_symmetry=False, assume_reflexivity=False): for x_idx, x in enumerate(xs): for y_idx, y in enumerate(xs): if x is y: continue if self.equivalent(x, y): for z_idx, z in enumerate(xs): if y is z: continue if assume_symmetry and x_idx > z_idx: continue if assume_reflexivity and x is z: continue if self.equivalent(y, z): if not self.equivalent(x, z): return False return True def match_lists(self, xs, ys, filter_minimum_size=0, filter_maximum_size=math.inf): xs = list(xs) ys = list(ys) if any(x is y for x in xs for y in ys): raise ValueError('Lists contain the same element. This is currently not supported.') classes = self.equivalence_classes([*xs, *ys]) return [ [ (0 if any(x2 is x for x2 in xs) else 1, x) for x in c ] for c in classes[::-1] if filter_minimum_size <= len(c) <= filter_maximum_size ] def iff_patch(patch: mock._patch): def decorator(f): def wrapped(*args, **kwargs): with patch: f(*args, **kwargs) try: f(*args, **kwargs) except: pass else: raise AssertionError('Test did not fail without patch') return wrapped return decorator def iff_not_patch(patch: mock._patch): def decorator(f): def wrapped(*args, **kwargs): f(*args, **kwargs) try: with patch: f(*args, **kwargs) except Exception as e: pass else: raise AssertionError('Test did not fail with patch') return wrapped return decorator EMAIL_CRASHES_TO = [] VOICE_CALL_ON_CRASH: List[Tuple[str, str]] = [] def list_logger(base_logging_function, store_in_list: list): def print_and_store(*args, **kwargs): base_logging_function(*args, **kwargs) store_in_list.extend(args) return print_and_store def main_wrapper(f): @functools.wraps(f) def wrapper(*args, **kwargs): start = time.perf_counter() # import lib.stack_tracer import __main__ # does not help much # monitoring_thread = hanging_threads.start_monitoring(seconds_frozen=180, test_interval=1000) os.makedirs('logs', exist_ok=True) stack_tracer.trace_start('logs/' + os.path.split(__main__.__file__)[-1] + '.html', interval=5) faulthandler.enable() profile_wall_time_instead_if_profiling() # noinspection PyBroadException try: return f(*args, **kwargs) except KeyboardInterrupt: error_messages = [] print_exc_plus.print_exc_plus(print=list_logger(logging.error, error_messages), serialize_to='logs/' + os.path.split(__main__.__file__)[-1] + '.dill') except: error_messages = [] print_exc_plus.print_exc_plus(print=list_logger(logging.error, error_messages), serialize_to='logs/' + os.path.split(__main__.__file__)[-1] + '.dill') for recipient in EMAIL_CRASHES_TO: from jobs.sending_emails import send_mail send_mail.create_simple_mail_via_gmail(body='\n'.join(error_messages), filepath=None, excel_name=None, to_mail=recipient, subject='[python] Crash report') for to_number, from_number in VOICE_CALL_ON_CRASH: logging.info(f'Calling {from_number} to notify about the crash.') voice_call('This is a notification message that one of your python scripts has crashed. If you are unsure about the origin of this call, please contact Eren Yilmaz.', to_number, from_number) finally: logging.info('Terminated.') total_time = time.perf_counter() - start faulthandler.disable() stack_tracer.trace_stop() frequency = 2000 duration = 500 beep(frequency, duration) print('Total time', total_time) try: from algorithm_development.metatrader import ZeroMQ_Connector ZeroMQ_Connector.DWX_ZeroMQ_Connector.deactivate_all() except ImportError: pass return wrapper def voice_call(msg, to_number, from_number): from twilio.rest import Client account_sid = 'AC63c459168c3e4fe34e462acb4f44f748' auth_token = 'b633bc0e945fe7cb737fdac395cc71d6' client = Client(account_sid, auth_token) call = client.calls.create( twiml=f'{msg}', from_=from_number, to=to_number, ) print(call.sid) def required_size_for_safe_rotation(base: Tuple[X, Y, Z], rotate_range_deg) -> Tuple[X, Y, Z]: if abs(rotate_range_deg) > 45: raise NotImplementedError if abs(rotate_range_deg) > 0: x_length = base[2] * math.sin(rotate_range_deg / 180 * math.pi) + base[1] * math.cos( rotate_range_deg / 180 * math.pi) y_length = base[2] * math.cos(rotate_range_deg / 180 * math.pi) + base[1] * math.sin( rotate_range_deg / 180 * math.pi) result = (base[0], x_length, y_length,) else: result = base return result def round_to_closest_value(x, values, assume_sorted=False): if not assume_sorted: values = sorted(values) next_largest = bisect_left(values, x) # binary search if next_largest == 0: return values[0] if next_largest == len(values): return values[-1] next_smallest = next_largest - 1 smaller = values[next_smallest] larger = values[next_largest] if abs(smaller - x) < abs(larger - x): return smaller else: return larger def binary_search(a, x, lo=0, hi=None): hi = hi if hi is not None else len(a) # hi defaults to len(a) pos = bisect_left(a, x, lo, hi) # find insertion position return pos if pos != hi and a[pos] == x else -1 # don't walk off the end def ceil_to_closest_value(x, values): values = sorted(values) next_largest = bisect_left(values, x) # binary search if next_largest < len(values): return values[next_largest] else: return values[-1] # if there is no larger value use the largest one def print_progress_bar(iteration, total, prefix='Progress:', suffix='', decimals=1, length=50, fill='█', print_eta=True): """ Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) """ percent = ("{0:" + str(4 + decimals) + "." + str(decimals) + "f}").format(100 * (iteration / float(total))) filled_length = int(length * iteration // total) bar = fill * filled_length + '-' * (length - filled_length) if getattr(print_progress_bar, 'last_printed_value', None) == (prefix, bar, percent, suffix): return print_progress_bar.last_printed_value = (prefix, bar, percent, suffix) print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end='') # Print New Line on Complete if iteration == total: print() def get_all_subclasses(klass): all_subclasses = [] for subclass in klass.__subclasses__(): all_subclasses.append(subclass) all_subclasses.extend(get_all_subclasses(subclass)) return all_subclasses def my_mac_address(): """ https://stackoverflow.com/a/160821 """ import uuid mac = uuid.getnode() if (mac >> 40) % 2: return None mac = uuid.UUID(int=mac).hex[-12:] return mac def latin1_json(data): return json.dumps(data, ensure_ascii=False).encode('latin-1') def l2_norm(v1, v2): if len(v1) != len(v2): raise ValueError('Both vectors must be of the same size') return math.sqrt(sum([(x1 - x2) * (x1 - x2) for x1, x2 in zip(v1, v2)])) def allow_additional_unused_keyword_arguments(func): @functools.wraps(func) def wrapper(*args, **kwargs): allowed_kwargs = [param.name for param in inspect.signature(func).parameters.values()] allowed_kwargs = {a: kwargs[a] for a in kwargs if a in allowed_kwargs} return func(*args, **allowed_kwargs) return wrapper def copy_and_rename_method(func, new_name): funcdetails = [ func.__code__, func.__globals__, func.__name__, func.__defaults__, func.__closure__ ] old_name = func.__name__ # copy # new_func = dill.loads(dill.dumps(func)) new_func = FunctionType(*funcdetails) assert new_func is not funcdetails # rename new_func.__name__ = new_name assert func.__name__ is old_name return new_func def rename(new_name): def decorator(f): f.__name__ = new_name return f return decorator class LogicError(Exception): pass def round_time(dt=None, precision=60): """Round a datetime object to any time lapse in seconds dt : datetime.datetime object, default now. roundTo : Closest number of seconds to round to, default 1 minute. Author: Thierry Husson 2012 - Use it as you want but don't blame me. """ if dt is None: dt = datetime.datetime.now() if isinstance(precision, datetime.timedelta): precision = precision.total_seconds() seconds = (dt.replace(tzinfo=None) - dt.min).seconds rounding = (seconds + precision / 2) // precision * precision return dt + datetime.timedelta(seconds=rounding - seconds, microseconds=dt.microsecond) def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] def shorten_name(name): name = re.sub(r'\s+', r' ', str(name)) name = name.replace(', ', ',') name = name.replace(', ', ',') name = name.replace(' ', '_') return re.sub(r'([A-Za-z])[a-z]*_?', r'\1', str(name)) def array_analysis(a: numpy.ndarray): print(f' Shape: {a.shape}') mean = a.mean() print(f' Mean: {mean}') print(f' Std: {a.std()}') print(f' Min, Max: {a.min(), a.max()}') print(f' Mean absolute: {numpy.abs(a).mean()}') print(f' Mean square: {numpy.square(a).mean()}') print(f' Mean absolute difference from mean: {numpy.abs(a - mean).mean()}') print(f' Mean squared difference from mean: {numpy.square(a - mean).mean()}') nonzero = numpy.count_nonzero(a) print(f' Number of non-zeros: {nonzero}') print(f' Number of zeros: {numpy.prod(a.shape) - nonzero}') if a.shape[-1] > 1 and a.shape[-1] <= 1000: # last dim is probably the number of classes print(f' Class counts: {numpy.count_nonzero(a, axis=tuple(range(len(a.shape) - 1)))}') def current_year_begin(): return datetime.datetime(datetime.datetime.today().year, 1, 1).timestamp() def current_day_begin(): return datetime.datetime.today().timestamp() // (3600 * 24) * (3600 * 24) def current_second_begin(): return floor(datetime.datetime.today().timestamp()) def running_workers(executor): print(next(iter(executor._threads)).__dict__) return sum(1 for t in executor._threads if t == 1) def queued_calls(executor): return len(executor._work_queue.queue) def retry_on_error(max_tries=3, delay=0.5, backoff=2, only_error_classes=Exception): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): for i in range(max_tries): try: return func(*args, **kwargs) except only_error_classes as e: if i == max_tries - 1: raise logging.error(f'Re-try after error in {func.__name__}: {type(e).__name__}, {e}') time.sleep(delay * (backoff ** i)) return wrapper return decorator class EBC: SUBCLASSES_BY_NAME: Dict[str, Type['EBC']] = {} def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) EBC.SUBCLASSES_BY_NAME[cls.__name__] = cls def __eq__(self, other): return isinstance(other, type(self)) and self.__dict__ == other.__dict__ def __str__(self): return str(self.__dict__) def __repr__(self): return f'{type(self).__name__}(**' + str(self.__dict__) + ')' def to_json(self) -> Dict[str, Any]: result: Dict[str, Any] = { 'type': type(self).__name__, **self.__dict__, } for k in result: if isinstance(result[k], EBC): result[k] = result[k].to_json() elif isinstance(result[k], numpy.ndarray): result[k] = result[k].tolist() elif isinstance(result[k], list): result[k] = [r.to_json() if isinstance(r, EBC) else r for r in result[k]] return result @staticmethod def from_json(data: Dict[str, Any]): cls = EBC.SUBCLASSES_BY_NAME[data['type']] return class_from_json(cls, data) def class_from_json(cls, data: Dict[str, Any]): if isinstance(data, str): data = json.loads(data) # noinspection PyArgumentList try: return cls(**data) except TypeError as e: if "__init__() got an unexpected keyword argument 'type'" in str(e) or 'takes no arguments' in str(e): # probably this was from a to_json method if data['type'] != cls.__name__: t = data['type'] logging.warning(f'Reconstructing a {cls.__name__} from a dict with type={t}') data = data.copy() del data['type'] for k,v in data.items(): if probably_serialized_from_ebc(v): data[k] = EBC.SUBCLASSES_BY_NAME[v['type']].from_json(v) elif isinstance(v, list): data[k] = [EBC.SUBCLASSES_BY_NAME[x['type']].from_json(x) if probably_serialized_from_ebc(x) else x for x in v] return allow_additional_unused_keyword_arguments(cls)(**data) else: raise def probably_serialized_from_ebc(data): return isinstance(data, dict) and 'type' in data and data['type'] in EBC.SUBCLASSES_BY_NAME class EBE(Enum): def __int__(self): return self.value def __str__(self): return self.name def __repr__(self): return self.name @classmethod def from_name(cls, variable_name): return cls.__dict__[variable_name] class Bunch(dict, EBC): def __init__(self, **kwargs): dict.__init__(self, kwargs) self.__dict__.update(kwargs) def add_method(self, m): setattr(self, m.__name__, functools.partial(m, self)) def floor_to_multiple_of(x, multiple_of): return math.floor(x / multiple_of) * multiple_of def round_to_multiple_of(x, multiple_of): return round(x / multiple_of) * multiple_of def ceil_to_multiple_of(x, multiple_of): return math.ceil(x / multiple_of) * multiple_of