12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127 |
- import datetime
- import faulthandler
- import functools
- import gc
- import inspect
- import json
- import math
- import os
- import random
- import re
- import sqlite3
- import sys
- import threading
- import time
- from bisect import bisect_left
- from enum import Enum
- from itertools import chain, combinations
- from math import log, isnan, nan, floor, log10, gcd
- from numbers import Number
- from shutil import copyfile
- from threading import RLock
- from types import FunctionType
- from typing import Union, Tuple, List, Optional, Dict, Any, Type
- # noinspection PyUnresolvedReferences
- from unittest import TestCase, mock
- import cachetools
- import hanging_threads
- import matplotlib.cm
- import matplotlib.pyplot as plt
- import numpy
- import numpy as np
- import pandas
- import scipy.optimize
- import scipy.stats
- import tabulate
- from scipy.ndimage import zoom
- X = Y = Z = float
- class KnownIssue(Exception):
- """
- This means the code is not working and should not be used but still too valuable to be deleted
- """
- pass
- def powerset(iterable):
- """powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"""
- s = list(iterable)
- return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
- def plot_with_conf(x, y_mean, y_conf, alpha=0.5, **kwargs):
- ax = kwargs.pop('ax', plt.gca())
- base_line, = ax.plot(x, y_mean, **kwargs)
- y_mean = np.array(y_mean)
- y_conf = np.array(y_conf)
- lb = y_mean - y_conf
- ub = y_mean + y_conf
- ax.fill_between(x, lb, ub, facecolor=base_line.get_color(), alpha=alpha)
- def choice(sequence, probabilities):
- # if sum(probabilities) != 1:
- # raise AssertionError('Probabilities must sum to 1')
- r = random.random()
- for idx, c in enumerate(sequence):
- r -= probabilities[idx]
- if r < 0:
- return c
- raise AssertionError('Probabilities must sum to 1')
- def print_attributes(obj, include_methods=False, ignore=None):
- if ignore is None:
- ignore = []
- for attr in dir(obj):
- if attr in ignore:
- continue
- if attr.startswith('_'):
- continue
- if not include_methods and callable(obj.__getattr__(attr)):
- continue
- print(attr, ':', obj.__getattr__(attr).__class__.__name__, ':', obj.__getattr__(attr))
- def attr_dir(obj, include_methods=False, ignore=None):
- if ignore is None:
- ignore = []
- return {attr: obj.__getattr__(attr)
- for attr in dir(obj)
- if not attr.startswith('_') and (
- include_methods or not callable(obj.__getattr__(attr))) and attr not in ignore}
- def zoom_to_shape(a: np.ndarray, shape: Tuple, mode: str = 'smooth', verbose=1):
- from keras import backend
- a = np.array(a, dtype=backend.floatx()) # also does a copy
- shape_dim = len(a.shape)
- if len(a.shape) != len(shape):
- raise ValueError('The shapes must have the same dimension but were len({0}) = {1} (original) '
- 'and len({2}) = {3} desired.'.format(a.shape, len(a.shape), shape, len(shape)))
- if len(shape) == 0:
- return a
- zoom_factors = tuple(shape[idx] / a.shape[idx] for idx in range(shape_dim))
- def _current_index_in_old_array():
- return tuple(slice(0, length) if axis != current_axis else slice(current_pixel_index, current_pixel_index + 1)
- for axis, length in enumerate(a.shape))
- def _current_pixel_shape():
- return tuple(length if axis != current_axis else 1
- for axis, length in enumerate(a.shape))
- def _current_result_index():
- return tuple(
- slice(0, length) if axis != current_axis else slice(pixel_index_in_result, pixel_index_in_result + 1)
- for axis, length in enumerate(a.shape))
- def _current_result_shape():
- return tuple(orig_length if axis != current_axis else shape[axis]
- for axis, orig_length in enumerate(a.shape))
- if mode == 'constant':
- result = zoom(a, zoom_factors)
- assert result.shape == shape
- return result
- elif mode == 'smooth':
- result = a
- for current_axis, zoom_factor in sorted(enumerate(zoom_factors), key=lambda x: x[1]):
- result = np.zeros(_current_result_shape(), dtype=backend.floatx())
- # current_length = a.shape[current_axis]
- desired_length = shape[current_axis]
- current_pixel_index = 0
- current_pixel_part = 0 # how much of the current pixel is already read
- for pixel_index_in_result in range(desired_length):
- pixels_remaining = 1 / zoom_factor
- pixel_sum = np.zeros(_current_pixel_shape())
- while pixels_remaining + current_pixel_part > 1:
- pixel_sum += (1 - current_pixel_part) * a[_current_index_in_old_array()]
- current_pixel_index += 1
- pixels_remaining -= (1 - current_pixel_part)
- current_pixel_part = 0
- # the remaining pixel_part
- try:
- pixel_sum += pixels_remaining * a[_current_index_in_old_array()]
- except (IndexError, ValueError):
- if verbose:
- print('WARNING: Skipping {0} pixels because of numerical imprecision.'.format(pixels_remaining))
- else:
- current_pixel_part += pixels_remaining
- # insert to result
- pixel_sum *= zoom_factor
- result[_current_result_index()] = pixel_sum
- a = result
- assert result.shape == shape
- return result
- else:
- return NotImplementedError('Mode not available.')
- def profile_wall_time_instead_if_profiling():
- try:
- import yappi
- except ModuleNotFoundError:
- return
- currently_profiling = len(yappi.get_func_stats())
- if currently_profiling and yappi.get_clock_type() != 'wall':
- yappi.stop()
- print('Profiling wall time instead of cpu time.')
- yappi.clear_stats()
- yappi.set_clock_type("wall")
- yappi.start()
- def dummy_computation(*_args, **_kwargs):
- pass
- def backup_file(filename):
- copyfile(filename, backup_file_path(filename))
- def backup_file_path(filename):
- return filename + time.strftime("%Y%m%d") + '.bak'
- # noinspection SpellCheckingInspection
- def my_tabulate(data, tablefmt='pipe', **params):
- if data == [] and 'headers' in params:
- data = [(None for _ in params['headers'])]
- tabulate.MIN_PADDING = 0
- return tabulate.tabulate(data, tablefmt=tablefmt, **params)
- def ce_loss(y_true, y_predicted):
- return -(y_true * log(y_predicted) + (1 - y_true) * log(1 - y_predicted))
- class DontSaveResultsError(Exception):
- pass
- def multinomial(n, bins):
- if bins == 0:
- if n > 0:
- raise ValueError('Cannot distribute to 0 bins.')
- return []
- remaining = n
- results = []
- for i in range(bins - 1):
- from numpy.random.mtrand import binomial
- x = binomial(remaining, 1 / (bins - i))
- results.append(x)
- remaining -= x
- results.append(remaining)
- return results
- class UnknownTypeError(Exception):
- pass
- # def shape_analysis(xs):
- # composed_dtypes = [list, tuple, np.ndarray, dict, set]
- # base_dtypes = [str, int, float, type, object] # TODO add class and superclass of xs first element
- # all_dtypes = composed_dtypes + base_dtypes
- # if isinstance(xs, np.ndarray):
- # outer_brackets = ('[', ']')
- # shape = xs.shape
- # dtype = xs.dtype
- # elif isinstance(xs, tuple):
- # outer_brackets = ('(', ')')
- # shape = len(xs)
- # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0]
- # elif isinstance(xs, list):
- # outer_brackets = ('[', ']')
- # shape = len(xs)
- # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0]
- # elif isinstance(xs, dict) or isinstance(xs, set):
- # outer_brackets = ('{', '}')
- # shape = len(xs)
- # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0]
- # elif any(isinstance(xs, t) for t in base_dtypes):
- # for t in base_dtypes:
- # if isinstance(xs, t):
- # return str(t.__name__)
- # raise AssertionError('This should be unreachable.')
- # else:
- # raise UnknownTypeError('Unknown type:' + type(xs).__name__)
- #
- # if shape and shape != '?':
- # return outer_brackets[0] + str(xs.shape) + ' * ' + str(dtype) + outer_brackets[1]
- # else:
- # return outer_brackets[0] + outer_brackets[1]
- def beta_conf_interval_mle(data, conf=0.95):
- if len(data) <= 1:
- return 0, 1 # overestimates the interval
- if any(d < 0 or d > 1 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data), numpy.mean(data)
- epsilon = 1e-3
- # adjusted_data = data.copy()
- # for idx in range(len(adjusted_data)):
- # adjusted_data[idx] *= (1 - 2 * epsilon)
- # adjusted_data[idx] += epsilon
- alpha, beta, _, _ = scipy.stats.beta.fit(data, floc=-epsilon, fscale=1 + 2 * epsilon)
- lower, upper = scipy.stats.beta.interval(alpha=conf, a=alpha, b=beta)
- if lower < 0:
- lower = 0
- if upper < 0:
- upper = 0
- if lower > 1:
- lower = 1
- if upper > 1:
- upper = 1
- return lower, upper
- def gamma_conf_interval_mle(data, conf=0.95) -> Tuple[float, float]:
- if len(data) == 0:
- return nan, nan
- if len(data) == 1:
- return nan, nan
- if any(d < 0 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data).item(), 0
- alpha, _, scale = scipy.stats.gamma.fit(data, floc=0)
- lower, upper = scipy.stats.gamma.interval(alpha=conf, a=alpha, scale=scale)
- if lower < 0:
- lower = 0
- if upper < 0:
- upper = 0
- return lower, upper
- beta_quantile_cache = cachetools.LRUCache(maxsize=10)
- @cachetools.cached(cache=beta_quantile_cache, key=lambda x1, p1, x2, p2, guess: (x1, x2, p1, p2))
- def beta_parameters_quantiles(x1, p1, x2, p2, guess=(3, 3)):
- "Find parameters for a beta random variable X; so; that; P(X > x1) = p1 and P(X > x2) = p2.; "
- def square(x):
- return x * x
- def objective(v):
- (a, b) = v
- temp = square(scipy.stats.beta.cdf(x1, a, b) - p1)
- temp += square(scipy.stats.beta.cdf(x2, a, b) - p2)
- return temp
- xopt = scipy.optimize.fmin(objective, guess, disp=False)
- return (xopt[0], xopt[1])
- def beta_conf_interval_quantile(data, conf=0.95, quantiles=(0.25, 0.75)):
- if len(data) <= 1:
- return 0, 1 # overestimates the interval
- mu = numpy.mean(data)
- v = numpy.var(data)
- data = numpy.array(data)
- if v == 0:
- return mu, mu
- lower = numpy.quantile(data, quantiles[0])
- upper = numpy.quantile(data, quantiles[1])
- alpha_guess = mu ** 2 * ((1 - mu) / v - 1 / mu)
- beta_guess = alpha_guess * (1 / mu - 1)
- alpha, beta = beta_parameters_quantiles(lower, quantiles[0], upper, quantiles[1], (alpha_guess, beta_guess))
- return scipy.stats.beta.interval(alpha=conf, a=alpha, b=beta)
- def beta_stats_quantile(data, quantiles=(0.25, 0.75)):
- if len(data) <= 1:
- return 0, 1 # overestimates the interval
- data = numpy.array(data)
- mu = numpy.mean(data)
- v = numpy.var(data)
- if v == 0:
- return mu, mu
- lower = numpy.quantile(data, quantiles[0])
- upper = numpy.quantile(data, quantiles[1])
- alpha_guess = mu ** 2 * ((1 - mu) / v - 1 / mu)
- beta_guess = alpha_guess * (1 / mu - 1)
- alpha, beta = beta_parameters_quantiles(lower, quantiles[0], upper, quantiles[1], (alpha_guess, beta_guess))
- return scipy.stats.beta.stats(a=alpha, b=beta)
- def beta_stats_mle(data):
- if len(data) == 0:
- return nan, nan
- if len(data) == 1:
- return nan, nan
- if any(d < 0 or d > 1 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data), 0
- epsilon = 1e-4
- # adjusted_data = data.copy()
- # for idx in range(len(adjusted_data)):
- # adjusted_data[idx] *= (1 - 2 * epsilon)
- # adjusted_data[idx] += epsilon
- alpha, beta, _, _ = scipy.stats.beta.fit(data, floc=-epsilon, fscale=1 + 2 * epsilon)
- return scipy.stats.beta.stats(a=alpha, b=beta)
- def gamma_stats_mle(data):
- if len(data) == 0:
- return nan, nan
- if len(data) == 1:
- return nan, nan
- if any(d < 0 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data), 0
- alpha, _, scale = scipy.stats.gamma.fit(data, floc=0)
- return scipy.stats.gamma.stats(a=alpha, scale=scale)
- beta_stats = beta_stats_quantile
- beta_conf_interval = beta_conf_interval_quantile
- gamma_stats = gamma_stats_mle
- gamma_conf_interval = gamma_conf_interval_mle
- def split_df_list(df, target_column):
- """
- df = data frame to split,
- target_column = the column containing the values to split
- separator = the symbol used to perform the split
- returns: a data frame with each entry for the target column separated, with each element moved into a new row.
- The values in the other columns are duplicated across the newly divided rows.
- SOURCE: https://gist.github.com/jlln/338b4b0b55bd6984f883
- """
- def split_list_to_rows(row, row_accumulator):
- split_row = json.loads(row[target_column])
- for s in split_row:
- new_row = row.to_dict()
- new_row[target_column] = s
- row_accumulator.append(new_row)
- new_rows = []
- df.apply(split_list_to_rows, axis=1, args=(new_rows,))
- new_df = pandas.DataFrame(new_rows)
- return new_df
- try:
- import winsound as win_sound
- def beep(*args, **kwargs):
- win_sound.Beep(*args, **kwargs)
- except ImportError:
- win_sound = None
- def beep(*_args, **_kwargs):
- pass
- def round_to_digits(x, d):
- if x == 0:
- return 0
- if isnan(x):
- return nan
- try:
- return round(x, d - 1 - int(floor(log10(abs(x)))))
- except OverflowError:
- return x
- def gc_if_memory_error(f, *args, **kwargs):
- try:
- return f(*args, **kwargs)
- except MemoryError:
- print('Starting garbage collector')
- gc.collect()
- return f(*args, **kwargs)
- def assert_not_empty(x):
- assert len(x)
- return x
- def validation_steps(validation_dataset_size, maximum_batch_size):
- batch_size = gcd(validation_dataset_size, maximum_batch_size)
- steps = validation_dataset_size // batch_size
- assert batch_size * steps == validation_dataset_size
- return batch_size, steps
- def functional_dependency_trigger(connection: sqlite3.Connection,
- table_name: str,
- determining_columns: List[str],
- determined_columns: List[str],
- exist_ok: bool, ):
- cursor = connection.cursor()
- # possible_performance_improvements
- determined_columns = [c for c in determined_columns if c not in determining_columns]
- trigger_base_name = '_'.join([table_name] + determining_columns + ['determine'] + determined_columns)
- error_message = ','.join(determining_columns) + ' must uniquely identify ' + ','.join(determined_columns)
- # when inserting check if there is already an entry with these values
- cursor.execute(f'''
- CREATE TRIGGER {'IF NOT EXISTS' if exist_ok else ''} {trigger_base_name}_after_insert
- BEFORE INSERT ON {table_name}
- WHEN EXISTS(SELECT * FROM {table_name}
- WHERE ({' AND '.join(f'NEW.{c} IS NOT NULL AND {c} = NEW.{c}' for c in determining_columns)})
- AND ({' OR '.join(f'{c} != NEW.{c}' for c in determined_columns)}))
- BEGIN SELECT RAISE(ROLLBACK, '{error_message}'); END
- ''')
- # when updating check if there is already an entry with these values (only if changed)
- cursor.execute(f'''
- CREATE TRIGGER {'IF NOT EXISTS' if exist_ok else ''} {trigger_base_name}_after_update
- BEFORE UPDATE ON {table_name}
- WHEN EXISTS(SELECT * FROM {table_name}
- WHERE ({' AND '.join(f'NEW.{c} IS NOT NULL AND {c} = NEW.{c}' for c in determining_columns)})
- AND ({' OR '.join(f'{c} != NEW.{c}' for c in determined_columns)}))
- BEGIN SELECT RAISE(ROLLBACK, '{error_message}'); END
- ''')
- def heatmap_from_points(x, y,
- x_lim: Optional[Union[int, Tuple[int, int]]] = None,
- y_lim: Optional[Union[int, Tuple[int, int]]] = None,
- gridsize=30):
- if isinstance(x_lim, Number):
- x_lim = (x_lim, x_lim)
- if isinstance(y_lim, Number):
- y_lim = (y_lim, y_lim)
- plt.hexbin(x, y, gridsize=gridsize, cmap=matplotlib.cm.jet, bins=None)
- if x_lim is not None:
- plt.xlim(x_lim)
- if y_lim is not None:
- plt.ylim(y_lim)
- cb = plt.colorbar()
- cb.set_label('mean value')
- def strptime(date_string, fmt):
- return datetime.datetime(*(time.strptime(date_string, fmt)[0:6]))
- class PrintLineRLock(RLock().__class__):
- def __init__(self, *args, name='', **kwargs):
- # noinspection PyArgumentList
- super().__init__(*args, **kwargs)
- self.name = name
- def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
- print(f'Trying to acquire Lock {self.name}')
- result = RLock.acquire(self, blocking, timeout)
- print(f'Acquired Lock {self.name}')
- return result
- def release(self) -> None:
- print(f'Trying to release Lock {self.name}')
- # noinspection PyNoneFunctionAssignment
- result = RLock.release(self)
- print(f'Released Lock {self.name}')
- return result
- def __enter__(self, *args, **kwargs):
- print('Trying to enter Lock')
- # noinspection PyArgumentList
- super().__enter__(*args, **kwargs)
- print('Entered Lock')
- def __exit__(self, *args, **kwargs):
- print('Trying to exit Lock')
- super().__exit__(*args, **kwargs)
- print('Exited Lock')
- def fixed_get_current_frames():
- """Return current threads prepared for
- further processing.
- """
- threads = {thread.ident: thread for thread in threading.enumerate()}
- return {
- thread_id: {
- 'frame': hanging_threads.thread2list(frame),
- 'time': None,
- 'id': thread_id,
- 'name': threads[thread_id].name,
- 'object': threads[thread_id]
- } for thread_id, frame in sys._current_frames().items()
- if thread_id in threads # otherwise keyerrors might happen because of race conditions
- }
- hanging_threads.get_current_frames = fixed_get_current_frames
- class CallCounter():
- def __init__(self, f):
- self.f = f
- self.calls = 0
- self.__name__ = f.__name__
- def __call__(self, *args, **kwargs):
- self.calls += 1
- return self.f(*args, **kwargs)
- def __str__(self):
- return str(self.__dict__)
- def __repr__(self):
- return self.__class__.__name__ + repr(self.__dict__)
- def test_with_timeout(timeout=2):
- def wrapper(f):
- from lib.threading_timer_decorator import exit_after
- f = exit_after(timeout)(f)
- @functools.wraps(f)
- def wrapped(*args, **kwargs):
- try:
- print(f'Running this test with timeout: {timeout}')
- return f(*args, **kwargs)
- except KeyboardInterrupt:
- raise AssertionError(f'Test took longer than {timeout} seconds')
- return wrapped
- return wrapper
- def lru_cache_by_id(maxsize):
- return cachetools.cached(cachetools.LRUCache(maxsize=maxsize), key=id)
- class EquivalenceRelation:
- def equivalent(self, a, b) -> bool:
- raise NotImplementedError('Abstract method')
- def equivalence_classes(self, xs: list):
- classes = []
- for x in xs:
- for c in classes:
- if self.equivalent(x, c[0]):
- c.append(x)
- break
- else:
- classes.append([x])
- return classes
- def check_reflexivity_on_dataset(self, xs):
- for x in xs:
- if not self.equivalent(x, x):
- return False
- return True
- def check_symmetry_on_dataset(self, xs):
- for x in xs:
- for y in xs:
- if x is y:
- continue
- if self.equivalent(x, y) and not self.equivalent(y, x):
- return False
- return True
- def check_axioms_on_dataset(self, xs):
- return (
- self.check_reflexivity_on_dataset(xs)
- and self.check_symmetry_on_dataset(xs)
- and self.check_transitivity_on_dataset(xs, assume_symmetry=True, assume_reflexivity=True)
- )
- def check_transitivity_on_dataset(self, xs, assume_symmetry=False, assume_reflexivity=False):
- for x_idx, x in enumerate(xs):
- for y_idx, y in enumerate(xs):
- if x is y:
- continue
- if self.equivalent(x, y):
- for z_idx, z in enumerate(xs):
- if y is z:
- continue
- if assume_symmetry and x_idx > z_idx:
- continue
- if assume_reflexivity and x is z:
- continue
- if self.equivalent(y, z):
- if not self.equivalent(x, z):
- return False
- return True
- def match_lists(self, xs, ys, filter_minimum_size=0, filter_maximum_size=math.inf):
- xs = list(xs)
- ys = list(ys)
- if any(x is y for x in xs for y in ys):
- raise ValueError('Lists contain the same element. This is currently not supported.')
- classes = self.equivalence_classes([*xs, *ys])
- return [
- [
- (0 if any(x2 is x for x2 in xs) else 1, x)
- for x in c
- ]
- for c in classes[::-1]
- if filter_minimum_size <= len(c) <= filter_maximum_size
- ]
- def iff_patch(patch: mock._patch):
- def decorator(f):
- def wrapped(*args, **kwargs):
- with patch:
- f(*args, **kwargs)
- try:
- f(*args, **kwargs)
- except:
- pass
- else:
- raise AssertionError('Test did not fail without patch')
- return wrapped
- return decorator
- def iff_not_patch(patch: mock._patch):
- def decorator(f):
- def wrapped(*args, **kwargs):
- f(*args, **kwargs)
- try:
- with patch:
- f(*args, **kwargs)
- except Exception as e:
- pass
- else:
- raise AssertionError('Test did not fail with patch')
- return wrapped
- return decorator
- EMAIL_CRASHES_TO = []
- VOICE_CALL_ON_CRASH: List[Tuple[str, str]] = []
- def list_logger(base_logging_function, store_in_list: list):
- def print_and_store(*args, **kwargs):
- base_logging_function(*args, **kwargs)
- store_in_list.extend(args)
- return print_and_store
- def main_wrapper(f):
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- start = time.perf_counter()
- # import lib.stack_tracer
- import __main__
- # does not help much
- # monitoring_thread = hanging_threads.start_monitoring(seconds_frozen=180, test_interval=1000)
- os.makedirs('logs', exist_ok=True)
- stack_tracer.trace_start('logs/' + os.path.split(__main__.__file__)[-1] + '.html', interval=5)
- faulthandler.enable()
- profile_wall_time_instead_if_profiling()
- # noinspection PyBroadException
- try:
- return f(*args, **kwargs)
- except KeyboardInterrupt:
- error_messages = []
- print_exc_plus.print_exc_plus(print=list_logger(logging.error, error_messages),
- serialize_to='logs/' + os.path.split(__main__.__file__)[-1] + '.dill')
- except:
- error_messages = []
- print_exc_plus.print_exc_plus(print=list_logger(logging.error, error_messages),
- serialize_to='logs/' + os.path.split(__main__.__file__)[-1] + '.dill')
- for recipient in EMAIL_CRASHES_TO:
- from jobs.sending_emails import send_mail
- send_mail.create_simple_mail_via_gmail(body='\n'.join(error_messages), filepath=None, excel_name=None, to_mail=recipient, subject='[python] Crash report')
- for to_number, from_number in VOICE_CALL_ON_CRASH:
- logging.info(f'Calling {from_number} to notify about the crash.')
- voice_call('This is a notification message that one of your python scripts has crashed. If you are unsure about the origin of this call, please contact Eren Yilmaz.',
- to_number, from_number)
- finally:
- logging.info('Terminated.')
- total_time = time.perf_counter() - start
- faulthandler.disable()
- stack_tracer.trace_stop()
- frequency = 2000
- duration = 500
- beep(frequency, duration)
- print('Total time', total_time)
- try:
- from algorithm_development.metatrader import ZeroMQ_Connector
- ZeroMQ_Connector.DWX_ZeroMQ_Connector.deactivate_all()
- except ImportError:
- pass
- return wrapper
- def voice_call(msg, to_number, from_number):
- from twilio.rest import Client
- account_sid = 'AC63c459168c3e4fe34e462acb4f44f748'
- auth_token = 'b633bc0e945fe7cb737fdac395cc71d6'
- client = Client(account_sid, auth_token)
- call = client.calls.create(
- twiml=f'<Response><Say>{msg}</Say></Response>',
- from_=from_number,
- to=to_number,
- )
- print(call.sid)
- def required_size_for_safe_rotation(base: Tuple[X, Y, Z], rotate_range_deg) -> Tuple[X, Y, Z]:
- if abs(rotate_range_deg) > 45:
- raise NotImplementedError
- if abs(rotate_range_deg) > 0:
- x_length = base[2] * math.sin(rotate_range_deg / 180 * math.pi) + base[1] * math.cos(
- rotate_range_deg / 180 * math.pi)
- y_length = base[2] * math.cos(rotate_range_deg / 180 * math.pi) + base[1] * math.sin(
- rotate_range_deg / 180 * math.pi)
- result = (base[0],
- x_length,
- y_length,)
- else:
- result = base
- return result
- def round_to_closest_value(x, values, assume_sorted=False):
- if not assume_sorted:
- values = sorted(values)
- next_largest = bisect_left(values, x) # binary search
- if next_largest == 0:
- return values[0]
- if next_largest == len(values):
- return values[-1]
- next_smallest = next_largest - 1
- smaller = values[next_smallest]
- larger = values[next_largest]
- if abs(smaller - x) < abs(larger - x):
- return smaller
- else:
- return larger
- def binary_search(a, x, lo=0, hi=None):
- hi = hi if hi is not None else len(a) # hi defaults to len(a)
- pos = bisect_left(a, x, lo, hi) # find insertion position
- return pos if pos != hi and a[pos] == x else -1 # don't walk off the end
- def ceil_to_closest_value(x, values):
- values = sorted(values)
- next_largest = bisect_left(values, x) # binary search
- if next_largest < len(values):
- return values[next_largest]
- else:
- return values[-1] # if there is no larger value use the largest one
- def print_progress_bar(iteration, total, prefix='Progress:', suffix='', decimals=1, length=50, fill='█',
- print_eta=True):
- """
- Call in a loop to create terminal progress bar
- @params:
- iteration - Required : current iteration (Int)
- total - Required : total iterations (Int)
- prefix - Optional : prefix string (Str)
- suffix - Optional : suffix string (Str)
- decimals - Optional : positive number of decimals in percent complete (Int)
- length - Optional : character length of bar (Int)
- fill - Optional : bar fill character (Str)
- """
- percent = ("{0:" + str(4 + decimals) + "." + str(decimals) + "f}").format(100 * (iteration / float(total)))
- filled_length = int(length * iteration // total)
- bar = fill * filled_length + '-' * (length - filled_length)
- if getattr(print_progress_bar, 'last_printed_value', None) == (prefix, bar, percent, suffix):
- return
- print_progress_bar.last_printed_value = (prefix, bar, percent, suffix)
- print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end='')
- # Print New Line on Complete
- if iteration == total:
- print()
- def get_all_subclasses(klass):
- all_subclasses = []
- for subclass in klass.__subclasses__():
- all_subclasses.append(subclass)
- all_subclasses.extend(get_all_subclasses(subclass))
- return all_subclasses
- def my_mac_address():
- """
- https://stackoverflow.com/a/160821
- """
- import uuid
- mac = uuid.getnode()
- if (mac >> 40) % 2:
- return None
- mac = uuid.UUID(int=mac).hex[-12:]
- return mac
- def latin1_json(data):
- return json.dumps(data, ensure_ascii=False).encode('latin-1')
- def l2_norm(v1, v2):
- if len(v1) != len(v2):
- raise ValueError('Both vectors must be of the same size')
- return math.sqrt(sum([(x1 - x2) * (x1 - x2) for x1, x2 in zip(v1, v2)]))
- def allow_additional_unused_keyword_arguments(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- allowed_kwargs = [param.name for param in inspect.signature(func).parameters.values()]
- allowed_kwargs = {a: kwargs[a] for a in kwargs if a in allowed_kwargs}
- return func(*args, **allowed_kwargs)
- return wrapper
- def copy_and_rename_method(func, new_name):
- funcdetails = [
- func.__code__,
- func.__globals__,
- func.__name__,
- func.__defaults__,
- func.__closure__
- ]
- old_name = func.__name__
- # copy
- # new_func = dill.loads(dill.dumps(func))
- new_func = FunctionType(*funcdetails)
- assert new_func is not funcdetails
- # rename
- new_func.__name__ = new_name
- assert func.__name__ is old_name
- return new_func
- def rename(new_name):
- def decorator(f):
- f.__name__ = new_name
- return f
- return decorator
- class LogicError(Exception):
- pass
- def chunks(lst, n):
- """Yield successive n-sized chunks from lst."""
- for i in range(0, len(lst), n):
- yield lst[i:i + n]
- def shorten_name(name):
- name = re.sub(r'\s+', r' ', str(name))
- name = name.replace(', ', ',')
- name = name.replace(', ', ',')
- name = name.replace(' ', '_')
- return re.sub(r'([A-Za-z])[a-z]*_?', r'\1', str(name))
- def array_analysis(a: numpy.ndarray):
- print(f' Shape: {a.shape}')
- mean = a.mean()
- print(f' Mean: {mean}')
- print(f' Std: {a.std()}')
- print(f' Min, Max: {a.min(), a.max()}')
- print(f' Mean absolute: {numpy.abs(a).mean()}')
- print(f' Mean square: {numpy.square(a).mean()}')
- print(f' Mean absolute difference from mean: {numpy.abs(a - mean).mean()}')
- print(f' Mean squared difference from mean: {numpy.square(a - mean).mean()}')
- nonzero = numpy.count_nonzero(a)
- print(f' Number of non-zeros: {nonzero}')
- print(f' Number of zeros: {numpy.prod(a.shape) - nonzero}')
- if a.shape[-1] > 1 and a.shape[-1] <= 1000:
- # last dim is probably the number of classes
- print(f' Class counts: {numpy.count_nonzero(a, axis=tuple(range(len(a.shape) - 1)))}')
- def current_year_begin():
- return datetime.datetime(datetime.datetime.today().year, 1, 1).timestamp()
- def current_day_begin():
- return datetime.datetime.today().timestamp() // (3600 * 24) * (3600 * 24)
- def current_second_begin():
- return floor(datetime.datetime.today().timestamp())
- def running_workers(executor):
- print(next(iter(executor._threads)).__dict__)
- return sum(1 for t in executor._threads
- if t == 1)
- def queued_calls(executor):
- return len(executor._work_queue.queue)
- def retry_on_error(max_tries=3, delay=0.5, backoff=2, only_error_classes=Exception):
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- for i in range(max_tries):
- try:
- return func(*args, **kwargs)
- except only_error_classes as e:
- if i == max_tries - 1:
- raise
- logging.error(f'Re-try after error in {func.__name__}: {type(e).__name__}, {e}')
- time.sleep(delay * (backoff ** i))
- return wrapper
- return decorator
- class EBC:
- SUBCLASSES_BY_NAME: Dict[str, Type['EBC']] = {}
- def __init_subclass__(cls, **kwargs):
- super().__init_subclass__(**kwargs)
- EBC.SUBCLASSES_BY_NAME[cls.__name__] = cls
- def __eq__(self, other):
- return isinstance(other, type(self)) and self.__dict__ == other.__dict__
- def __str__(self):
- return str(self.__dict__)
- def __repr__(self):
- return f'{type(self).__name__}(**' + str(self.__dict__) + ')'
- def to_json(self) -> Dict[str, Any]:
- result: Dict[str, Any] = {
- 'type': type(self).__name__,
- **self.__dict__,
- }
- for k in result:
- if isinstance(result[k], EBC):
- result[k] = result[k].to_json()
- elif isinstance(result[k], numpy.ndarray):
- result[k] = result[k].tolist()
- elif isinstance(result[k], list):
- result[k] = [r.to_json() if isinstance(r, EBC) else r
- for r in result[k]]
- return result
- @staticmethod
- def from_json(data: Dict[str, Any]):
- cls = EBC.SUBCLASSES_BY_NAME[data['type']]
- return class_from_json(cls, data)
- def class_from_json(cls, data: Dict[str, Any]):
- if isinstance(data, str):
- data = json.loads(data)
- # noinspection PyArgumentList
- try:
- return cls(**data)
- except TypeError as e:
- if "__init__() got an unexpected keyword argument 'type'" in str(e) or 'takes no arguments' in str(e):
- # probably this was from a to_json method
- if data['type'] != cls.__name__:
- t = data['type']
- logging.warning(f'Reconstructing a {cls.__name__} from a dict with type={t}')
- data = data.copy()
- del data['type']
- for k,v in data.items():
- if probably_serialized_from_ebc(v):
- data[k] = EBC.SUBCLASSES_BY_NAME[v['type']].from_json(v)
- elif isinstance(v, list):
- data[k] = [EBC.SUBCLASSES_BY_NAME[x['type']].from_json(x)
- if probably_serialized_from_ebc(x)
- else x
- for x in v]
- return allow_additional_unused_keyword_arguments(cls)(**data)
- else:
- raise
- def probably_serialized_from_ebc(data):
- return isinstance(data, dict) and 'type' in data and data['type'] in EBC.SUBCLASSES_BY_NAME
- class EBE(Enum):
- def __int__(self):
- return self.value
- def __str__(self):
- return self.name
- def __repr__(self):
- return self.name
- @classmethod
- def from_name(cls, variable_name):
- return cls.__dict__[variable_name]
- class Bunch(dict, EBC):
- def __init__(self, **kwargs):
- dict.__init__(self, kwargs)
- self.__dict__.update(kwargs)
- def add_method(self, m):
- setattr(self, m.__name__, functools.partial(m, self))
- def floor_to_multiple_of(x, multiple_of):
- return math.floor(x / multiple_of) * multiple_of
- def round_to_multiple_of(x, multiple_of):
- return round(x / multiple_of) * multiple_of
- def ceil_to_multiple_of(x, multiple_of):
- return math.ceil(x / multiple_of) * multiple_of
|