123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146 |
- import datetime
- import faulthandler
- import functools
- import gc
- import inspect
- import json
- import math
- import os
- import random
- import re
- import sqlite3
- import sys
- import threading
- import time
- from bisect import bisect_left
- from enum import Enum
- from itertools import chain, combinations
- from math import log, isnan, nan, floor, log10, gcd
- from numbers import Number
- from shutil import copyfile
- from threading import RLock
- from types import FunctionType
- from typing import Union, Tuple, List, Optional, Dict, Any, Type
- # noinspection PyUnresolvedReferences
- from unittest import TestCase, mock
- import cachetools
- import hanging_threads
- import matplotlib.cm
- import matplotlib.pyplot as plt
- import numpy
- import numpy as np
- import pandas
- import scipy.optimize
- import scipy.stats
- import tabulate
- from scipy.ndimage import zoom
- from lib import stack_tracer, print_exc_plus
- from lib.my_logger import logging
- X = Y = Z = float
- class KnownIssue(Exception):
- """
- This means the code is not working and should not be used but still too valuable to be deleted
- """
- pass
- def powerset(iterable):
- """powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"""
- s = list(iterable)
- return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
- def plot_with_conf(x, y_mean, y_conf, alpha=0.5, **kwargs):
- ax = kwargs.pop('ax', plt.gca())
- base_line, = ax.plot(x, y_mean, **kwargs)
- y_mean = np.array(y_mean)
- y_conf = np.array(y_conf)
- lb = y_mean - y_conf
- ub = y_mean + y_conf
- ax.fill_between(x, lb, ub, facecolor=base_line.get_color(), alpha=alpha)
- def choice(sequence, probabilities):
- # if sum(probabilities) != 1:
- # raise AssertionError('Probabilities must sum to 1')
- r = random.random()
- for idx, c in enumerate(sequence):
- r -= probabilities[idx]
- if r < 0:
- return c
- raise AssertionError('Probabilities must sum to 1')
- def print_attributes(obj, include_methods=False, ignore=None):
- if ignore is None:
- ignore = []
- for attr in dir(obj):
- if attr in ignore:
- continue
- if attr.startswith('_'):
- continue
- if not include_methods and callable(obj.__getattr__(attr)):
- continue
- print(attr, ':', obj.__getattr__(attr).__class__.__name__, ':', obj.__getattr__(attr))
- def attr_dir(obj, include_methods=False, ignore=None):
- if ignore is None:
- ignore = []
- return {attr: obj.__getattr__(attr)
- for attr in dir(obj)
- if not attr.startswith('_') and (
- include_methods or not callable(obj.__getattr__(attr))) and attr not in ignore}
- def zoom_to_shape(a: np.ndarray, shape: Tuple, mode: str = 'smooth', verbose=1):
- from keras import backend
- a = np.array(a, dtype=backend.floatx()) # also does a copy
- shape_dim = len(a.shape)
- if len(a.shape) != len(shape):
- raise ValueError('The shapes must have the same dimension but were len({0}) = {1} (original) '
- 'and len({2}) = {3} desired.'.format(a.shape, len(a.shape), shape, len(shape)))
- if len(shape) == 0:
- return a
- zoom_factors = tuple(shape[idx] / a.shape[idx] for idx in range(shape_dim))
- def _current_index_in_old_array():
- return tuple(slice(0, length) if axis != current_axis else slice(current_pixel_index, current_pixel_index + 1)
- for axis, length in enumerate(a.shape))
- def _current_pixel_shape():
- return tuple(length if axis != current_axis else 1
- for axis, length in enumerate(a.shape))
- def _current_result_index():
- return tuple(
- slice(0, length) if axis != current_axis else slice(pixel_index_in_result, pixel_index_in_result + 1)
- for axis, length in enumerate(a.shape))
- def _current_result_shape():
- return tuple(orig_length if axis != current_axis else shape[axis]
- for axis, orig_length in enumerate(a.shape))
- if mode == 'constant':
- result = zoom(a, zoom_factors)
- assert result.shape == shape
- return result
- elif mode == 'smooth':
- result = a
- for current_axis, zoom_factor in sorted(enumerate(zoom_factors), key=lambda x: x[1]):
- result = np.zeros(_current_result_shape(), dtype=backend.floatx())
- # current_length = a.shape[current_axis]
- desired_length = shape[current_axis]
- current_pixel_index = 0
- current_pixel_part = 0 # how much of the current pixel is already read
- for pixel_index_in_result in range(desired_length):
- pixels_remaining = 1 / zoom_factor
- pixel_sum = np.zeros(_current_pixel_shape())
- while pixels_remaining + current_pixel_part > 1:
- pixel_sum += (1 - current_pixel_part) * a[_current_index_in_old_array()]
- current_pixel_index += 1
- pixels_remaining -= (1 - current_pixel_part)
- current_pixel_part = 0
- # the remaining pixel_part
- try:
- pixel_sum += pixels_remaining * a[_current_index_in_old_array()]
- except (IndexError, ValueError):
- if verbose:
- print('WARNING: Skipping {0} pixels because of numerical imprecision.'.format(pixels_remaining))
- else:
- current_pixel_part += pixels_remaining
- # insert to result
- pixel_sum *= zoom_factor
- result[_current_result_index()] = pixel_sum
- a = result
- assert result.shape == shape
- return result
- else:
- return NotImplementedError('Mode not available.')
- def profile_wall_time_instead_if_profiling():
- try:
- import yappi
- except ModuleNotFoundError:
- return
- currently_profiling = len(yappi.get_func_stats())
- if currently_profiling and yappi.get_clock_type() != 'wall':
- yappi.stop()
- print('Profiling wall time instead of cpu time.')
- yappi.clear_stats()
- yappi.set_clock_type("wall")
- yappi.start()
- def dummy_computation(*_args, **_kwargs):
- pass
- def backup_file(filename):
- copyfile(filename, backup_file_path(filename))
- def backup_file_path(filename):
- return filename + time.strftime("%Y%m%d") + '.bak'
- # noinspection SpellCheckingInspection
- def my_tabulate(data, tablefmt='pipe', **params):
- if data == [] and 'headers' in params:
- data = [(None for _ in params['headers'])]
- tabulate.MIN_PADDING = 0
- return tabulate.tabulate(data, tablefmt=tablefmt, **params)
- def ce_loss(y_true, y_predicted):
- return -(y_true * log(y_predicted) + (1 - y_true) * log(1 - y_predicted))
- class DontSaveResultsError(Exception):
- pass
- def multinomial(n, bins):
- if bins == 0:
- if n > 0:
- raise ValueError('Cannot distribute to 0 bins.')
- return []
- remaining = n
- results = []
- for i in range(bins - 1):
- from numpy.random.mtrand import binomial
- x = binomial(remaining, 1 / (bins - i))
- results.append(x)
- remaining -= x
- results.append(remaining)
- return results
- class UnknownTypeError(Exception):
- pass
- # def shape_analysis(xs):
- # composed_dtypes = [list, tuple, np.ndarray, dict, set]
- # base_dtypes = [str, int, float, type, object] # TODO add class and superclass of xs first element
- # all_dtypes = composed_dtypes + base_dtypes
- # if isinstance(xs, np.ndarray):
- # outer_brackets = ('[', ']')
- # shape = xs.shape
- # dtype = xs.dtype
- # elif isinstance(xs, tuple):
- # outer_brackets = ('(', ')')
- # shape = len(xs)
- # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0]
- # elif isinstance(xs, list):
- # outer_brackets = ('[', ']')
- # shape = len(xs)
- # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0]
- # elif isinstance(xs, dict) or isinstance(xs, set):
- # outer_brackets = ('{', '}')
- # shape = len(xs)
- # dtype = [t for t in all_dtypes if all(isinstance(x, t) for x in xs)][0]
- # elif any(isinstance(xs, t) for t in base_dtypes):
- # for t in base_dtypes:
- # if isinstance(xs, t):
- # return str(t.__name__)
- # raise AssertionError('This should be unreachable.')
- # else:
- # raise UnknownTypeError('Unknown type:' + type(xs).__name__)
- #
- # if shape and shape != '?':
- # return outer_brackets[0] + str(xs.shape) + ' * ' + str(dtype) + outer_brackets[1]
- # else:
- # return outer_brackets[0] + outer_brackets[1]
- def beta_conf_interval_mle(data, conf=0.95):
- if len(data) <= 1:
- return 0, 1 # overestimates the interval
- if any(d < 0 or d > 1 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data), numpy.mean(data)
- epsilon = 1e-3
- # adjusted_data = data.copy()
- # for idx in range(len(adjusted_data)):
- # adjusted_data[idx] *= (1 - 2 * epsilon)
- # adjusted_data[idx] += epsilon
- alpha, beta, _, _ = scipy.stats.beta.fit(data, floc=-epsilon, fscale=1 + 2 * epsilon)
- lower, upper = scipy.stats.beta.interval(alpha=conf, a=alpha, b=beta)
- if lower < 0:
- lower = 0
- if upper < 0:
- upper = 0
- if lower > 1:
- lower = 1
- if upper > 1:
- upper = 1
- return lower, upper
- def gamma_conf_interval_mle(data, conf=0.95) -> Tuple[float, float]:
- if len(data) == 0:
- return nan, nan
- if len(data) == 1:
- return nan, nan
- if any(d < 0 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data).item(), 0
- alpha, _, scale = scipy.stats.gamma.fit(data, floc=0)
- lower, upper = scipy.stats.gamma.interval(alpha=conf, a=alpha, scale=scale)
- if lower < 0:
- lower = 0
- if upper < 0:
- upper = 0
- return lower, upper
- beta_quantile_cache = cachetools.LRUCache(maxsize=10)
- @cachetools.cached(cache=beta_quantile_cache, key=lambda x1, p1, x2, p2, guess: (x1, x2, p1, p2))
- def beta_parameters_quantiles(x1, p1, x2, p2, guess=(3, 3)):
- "Find parameters for a beta random variable X; so; that; P(X > x1) = p1 and P(X > x2) = p2.; "
- def square(x):
- return x * x
- def objective(v):
- (a, b) = v
- temp = square(scipy.stats.beta.cdf(x1, a, b) - p1)
- temp += square(scipy.stats.beta.cdf(x2, a, b) - p2)
- return temp
- xopt = scipy.optimize.fmin(objective, guess, disp=False)
- return (xopt[0], xopt[1])
- def beta_conf_interval_quantile(data, conf=0.95, quantiles=(0.25, 0.75)):
- if len(data) <= 1:
- return 0, 1 # overestimates the interval
- mu = numpy.mean(data)
- v = numpy.var(data)
- data = numpy.array(data)
- if v == 0:
- return mu, mu
- lower = numpy.quantile(data, quantiles[0])
- upper = numpy.quantile(data, quantiles[1])
- alpha_guess = mu ** 2 * ((1 - mu) / v - 1 / mu)
- beta_guess = alpha_guess * (1 / mu - 1)
- alpha, beta = beta_parameters_quantiles(lower, quantiles[0], upper, quantiles[1], (alpha_guess, beta_guess))
- return scipy.stats.beta.interval(alpha=conf, a=alpha, b=beta)
- def beta_stats_quantile(data, quantiles=(0.25, 0.75)):
- if len(data) <= 1:
- return 0, 1 # overestimates the interval
- data = numpy.array(data)
- mu = numpy.mean(data)
- v = numpy.var(data)
- if v == 0:
- return mu, mu
- lower = numpy.quantile(data, quantiles[0])
- upper = numpy.quantile(data, quantiles[1])
- alpha_guess = mu ** 2 * ((1 - mu) / v - 1 / mu)
- beta_guess = alpha_guess * (1 / mu - 1)
- alpha, beta = beta_parameters_quantiles(lower, quantiles[0], upper, quantiles[1], (alpha_guess, beta_guess))
- return scipy.stats.beta.stats(a=alpha, b=beta)
- def beta_stats_mle(data):
- if len(data) == 0:
- return nan, nan
- if len(data) == 1:
- return nan, nan
- if any(d < 0 or d > 1 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data), 0
- epsilon = 1e-4
- # adjusted_data = data.copy()
- # for idx in range(len(adjusted_data)):
- # adjusted_data[idx] *= (1 - 2 * epsilon)
- # adjusted_data[idx] += epsilon
- alpha, beta, _, _ = scipy.stats.beta.fit(data, floc=-epsilon, fscale=1 + 2 * epsilon)
- return scipy.stats.beta.stats(a=alpha, b=beta)
- def gamma_stats_mle(data):
- if len(data) == 0:
- return nan, nan
- if len(data) == 1:
- return nan, nan
- if any(d < 0 or isnan(d) for d in data):
- return nan, nan
- if numpy.var(data) == 0:
- return numpy.mean(data), 0
- alpha, _, scale = scipy.stats.gamma.fit(data, floc=0)
- return scipy.stats.gamma.stats(a=alpha, scale=scale)
- beta_stats = beta_stats_quantile
- beta_conf_interval = beta_conf_interval_quantile
- gamma_stats = gamma_stats_mle
- gamma_conf_interval = gamma_conf_interval_mle
- def split_df_list(df, target_column):
- """
- df = data frame to split,
- target_column = the column containing the values to split
- separator = the symbol used to perform the split
- returns: a data frame with each entry for the target column separated, with each element moved into a new row.
- The values in the other columns are duplicated across the newly divided rows.
- SOURCE: https://gist.github.com/jlln/338b4b0b55bd6984f883
- """
- def split_list_to_rows(row, row_accumulator):
- split_row = json.loads(row[target_column])
- for s in split_row:
- new_row = row.to_dict()
- new_row[target_column] = s
- row_accumulator.append(new_row)
- new_rows = []
- df.apply(split_list_to_rows, axis=1, args=(new_rows,))
- new_df = pandas.DataFrame(new_rows)
- return new_df
- try:
- import winsound as win_sound
- def beep(*args, **kwargs):
- win_sound.Beep(*args, **kwargs)
- except ImportError:
- win_sound = None
- def beep(*_args, **_kwargs):
- pass
- def round_to_digits(x, d):
- if x == 0:
- return 0
- if isnan(x):
- return nan
- try:
- return round(x, d - 1 - int(floor(log10(abs(x)))))
- except OverflowError:
- return x
- def gc_if_memory_error(f, *args, **kwargs):
- try:
- return f(*args, **kwargs)
- except MemoryError:
- print('Starting garbage collector')
- gc.collect()
- return f(*args, **kwargs)
- def assert_not_empty(x):
- assert len(x)
- return x
- def validation_steps(validation_dataset_size, maximum_batch_size):
- batch_size = gcd(validation_dataset_size, maximum_batch_size)
- steps = validation_dataset_size // batch_size
- assert batch_size * steps == validation_dataset_size
- return batch_size, steps
- def functional_dependency_trigger(connection: sqlite3.Connection,
- table_name: str,
- determining_columns: List[str],
- determined_columns: List[str],
- exist_ok: bool, ):
- cursor = connection.cursor()
- # possible_performance_improvements
- determined_columns = [c for c in determined_columns if c not in determining_columns]
- trigger_base_name = '_'.join([table_name] + determining_columns + ['determine'] + determined_columns)
- error_message = ','.join(determining_columns) + ' must uniquely identify ' + ','.join(determined_columns)
- # when inserting check if there is already an entry with these values
- cursor.execute(f'''
- CREATE TRIGGER {'IF NOT EXISTS' if exist_ok else ''} {trigger_base_name}_after_insert
- BEFORE INSERT ON {table_name}
- WHEN EXISTS(SELECT * FROM {table_name}
- WHERE ({' AND '.join(f'NEW.{c} IS NOT NULL AND {c} = NEW.{c}' for c in determining_columns)})
- AND ({' OR '.join(f'{c} != NEW.{c}' for c in determined_columns)}))
- BEGIN SELECT RAISE(ROLLBACK, '{error_message}'); END
- ''')
- # when updating check if there is already an entry with these values (only if changed)
- cursor.execute(f'''
- CREATE TRIGGER {'IF NOT EXISTS' if exist_ok else ''} {trigger_base_name}_after_update
- BEFORE UPDATE ON {table_name}
- WHEN EXISTS(SELECT * FROM {table_name}
- WHERE ({' AND '.join(f'NEW.{c} IS NOT NULL AND {c} = NEW.{c}' for c in determining_columns)})
- AND ({' OR '.join(f'{c} != NEW.{c}' for c in determined_columns)}))
- BEGIN SELECT RAISE(ROLLBACK, '{error_message}'); END
- ''')
- def heatmap_from_points(x, y,
- x_lim: Optional[Union[int, Tuple[int, int]]] = None,
- y_lim: Optional[Union[int, Tuple[int, int]]] = None,
- gridsize=30):
- if isinstance(x_lim, Number):
- x_lim = (x_lim, x_lim)
- if isinstance(y_lim, Number):
- y_lim = (y_lim, y_lim)
- plt.hexbin(x, y, gridsize=gridsize, cmap=matplotlib.cm.jet, bins=None)
- if x_lim is not None:
- plt.xlim(x_lim)
- if y_lim is not None:
- plt.ylim(y_lim)
- cb = plt.colorbar()
- cb.set_label('mean value')
- def strptime(date_string, fmt):
- return datetime.datetime(*(time.strptime(date_string, fmt)[0:6]))
- class PrintLineRLock(RLock().__class__):
- def __init__(self, *args, name='', **kwargs):
- # noinspection PyArgumentList
- super().__init__(*args, **kwargs)
- self.name = name
- def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
- print(f'Trying to acquire Lock {self.name}')
- result = RLock.acquire(self, blocking, timeout)
- print(f'Acquired Lock {self.name}')
- return result
- def release(self) -> None:
- print(f'Trying to release Lock {self.name}')
- # noinspection PyNoneFunctionAssignment
- result = RLock.release(self)
- print(f'Released Lock {self.name}')
- return result
- def __enter__(self, *args, **kwargs):
- print('Trying to enter Lock')
- # noinspection PyArgumentList
- super().__enter__(*args, **kwargs)
- print('Entered Lock')
- def __exit__(self, *args, **kwargs):
- print('Trying to exit Lock')
- super().__exit__(*args, **kwargs)
- print('Exited Lock')
- def fixed_get_current_frames():
- """Return current threads prepared for
- further processing.
- """
- threads = {thread.ident: thread for thread in threading.enumerate()}
- return {
- thread_id: {
- 'frame': hanging_threads.thread2list(frame),
- 'time': None,
- 'id': thread_id,
- 'name': threads[thread_id].name,
- 'object': threads[thread_id]
- } for thread_id, frame in sys._current_frames().items()
- if thread_id in threads # otherwise keyerrors might happen because of race conditions
- }
- hanging_threads.get_current_frames = fixed_get_current_frames
- class CallCounter():
- def __init__(self, f):
- self.f = f
- self.calls = 0
- self.__name__ = f.__name__
- def __call__(self, *args, **kwargs):
- self.calls += 1
- return self.f(*args, **kwargs)
- def __str__(self):
- return str(self.__dict__)
- def __repr__(self):
- return self.__class__.__name__ + repr(self.__dict__)
- def test_with_timeout(timeout=2):
- def wrapper(f):
- from lib.threading_timer_decorator import exit_after
- f = exit_after(timeout)(f)
- @functools.wraps(f)
- def wrapped(*args, **kwargs):
- try:
- print(f'Running this test with timeout: {timeout}')
- return f(*args, **kwargs)
- except KeyboardInterrupt:
- raise AssertionError(f'Test took longer than {timeout} seconds')
- return wrapped
- return wrapper
- def lru_cache_by_id(maxsize):
- return cachetools.cached(cachetools.LRUCache(maxsize=maxsize), key=id)
- class EquivalenceRelation:
- def equivalent(self, a, b) -> bool:
- raise NotImplementedError('Abstract method')
- def equivalence_classes(self, xs: list):
- classes = []
- for x in xs:
- for c in classes:
- if self.equivalent(x, c[0]):
- c.append(x)
- break
- else:
- classes.append([x])
- return classes
- def check_reflexivity_on_dataset(self, xs):
- for x in xs:
- if not self.equivalent(x, x):
- return False
- return True
- def check_symmetry_on_dataset(self, xs):
- for x in xs:
- for y in xs:
- if x is y:
- continue
- if self.equivalent(x, y) and not self.equivalent(y, x):
- return False
- return True
- def check_axioms_on_dataset(self, xs):
- return (
- self.check_reflexivity_on_dataset(xs)
- and self.check_symmetry_on_dataset(xs)
- and self.check_transitivity_on_dataset(xs, assume_symmetry=True, assume_reflexivity=True)
- )
- def check_transitivity_on_dataset(self, xs, assume_symmetry=False, assume_reflexivity=False):
- for x_idx, x in enumerate(xs):
- for y_idx, y in enumerate(xs):
- if x is y:
- continue
- if self.equivalent(x, y):
- for z_idx, z in enumerate(xs):
- if y is z:
- continue
- if assume_symmetry and x_idx > z_idx:
- continue
- if assume_reflexivity and x is z:
- continue
- if self.equivalent(y, z):
- if not self.equivalent(x, z):
- return False
- return True
- def match_lists(self, xs, ys, filter_minimum_size=0, filter_maximum_size=math.inf):
- xs = list(xs)
- ys = list(ys)
- if any(x is y for x in xs for y in ys):
- raise ValueError('Lists contain the same element. This is currently not supported.')
- classes = self.equivalence_classes([*xs, *ys])
- return [
- [
- (0 if any(x2 is x for x2 in xs) else 1, x)
- for x in c
- ]
- for c in classes[::-1]
- if filter_minimum_size <= len(c) <= filter_maximum_size
- ]
- def iff_patch(patch: mock._patch):
- def decorator(f):
- def wrapped(*args, **kwargs):
- with patch:
- f(*args, **kwargs)
- try:
- f(*args, **kwargs)
- except:
- pass
- else:
- raise AssertionError('Test did not fail without patch')
- return wrapped
- return decorator
- def iff_not_patch(patch: mock._patch):
- def decorator(f):
- def wrapped(*args, **kwargs):
- f(*args, **kwargs)
- try:
- with patch:
- f(*args, **kwargs)
- except Exception as e:
- pass
- else:
- raise AssertionError('Test did not fail with patch')
- return wrapped
- return decorator
- EMAIL_CRASHES_TO = []
- VOICE_CALL_ON_CRASH: List[Tuple[str, str]] = []
- def list_logger(base_logging_function, store_in_list: list):
- def print_and_store(*args, **kwargs):
- base_logging_function(*args, **kwargs)
- store_in_list.extend(args)
- return print_and_store
- def main_wrapper(f):
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- start = time.perf_counter()
- # import lib.stack_tracer
- import __main__
- # does not help much
- # monitoring_thread = hanging_threads.start_monitoring(seconds_frozen=180, test_interval=1000)
- os.makedirs('logs', exist_ok=True)
- stack_tracer.trace_start('logs/' + os.path.split(__main__.__file__)[-1] + '.html', interval=5)
- faulthandler.enable()
- profile_wall_time_instead_if_profiling()
- # noinspection PyBroadException
- try:
- return f(*args, **kwargs)
- except KeyboardInterrupt:
- error_messages = []
- print_exc_plus.print_exc_plus(print=list_logger(logging.error, error_messages),
- serialize_to='logs/' + os.path.split(__main__.__file__)[-1] + '.dill')
- except:
- error_messages = []
- print_exc_plus.print_exc_plus(print=list_logger(logging.error, error_messages),
- serialize_to='logs/' + os.path.split(__main__.__file__)[-1] + '.dill')
- for recipient in EMAIL_CRASHES_TO:
- from jobs.sending_emails import send_mail
- send_mail.create_simple_mail_via_gmail(body='\n'.join(error_messages), filepath=None, excel_name=None, to_mail=recipient, subject='[python] Crash report')
- for to_number, from_number in VOICE_CALL_ON_CRASH:
- logging.info(f'Calling {from_number} to notify about the crash.')
- voice_call('This is a notification message that one of your python scripts has crashed. If you are unsure about the origin of this call, please contact Eren Yilmaz.',
- to_number, from_number)
- finally:
- logging.info('Terminated.')
- total_time = time.perf_counter() - start
- faulthandler.disable()
- stack_tracer.trace_stop()
- frequency = 2000
- duration = 500
- beep(frequency, duration)
- print('Total time', total_time)
- try:
- from algorithm_development.metatrader import ZeroMQ_Connector
- ZeroMQ_Connector.DWX_ZeroMQ_Connector.deactivate_all()
- except ImportError:
- pass
- return wrapper
- def voice_call(msg, to_number, from_number):
- from twilio.rest import Client
- account_sid = 'AC63c459168c3e4fe34e462acb4f44f748'
- auth_token = 'b633bc0e945fe7cb737fdac395cc71d6'
- client = Client(account_sid, auth_token)
- call = client.calls.create(
- twiml=f'<Response><Say>{msg}</Say></Response>',
- from_=from_number,
- to=to_number,
- )
- print(call.sid)
- def required_size_for_safe_rotation(base: Tuple[X, Y, Z], rotate_range_deg) -> Tuple[X, Y, Z]:
- if abs(rotate_range_deg) > 45:
- raise NotImplementedError
- if abs(rotate_range_deg) > 0:
- x_length = base[2] * math.sin(rotate_range_deg / 180 * math.pi) + base[1] * math.cos(
- rotate_range_deg / 180 * math.pi)
- y_length = base[2] * math.cos(rotate_range_deg / 180 * math.pi) + base[1] * math.sin(
- rotate_range_deg / 180 * math.pi)
- result = (base[0],
- x_length,
- y_length,)
- else:
- result = base
- return result
- def round_to_closest_value(x, values, assume_sorted=False):
- if not assume_sorted:
- values = sorted(values)
- next_largest = bisect_left(values, x) # binary search
- if next_largest == 0:
- return values[0]
- if next_largest == len(values):
- return values[-1]
- next_smallest = next_largest - 1
- smaller = values[next_smallest]
- larger = values[next_largest]
- if abs(smaller - x) < abs(larger - x):
- return smaller
- else:
- return larger
- def binary_search(a, x, lo=0, hi=None):
- hi = hi if hi is not None else len(a) # hi defaults to len(a)
- pos = bisect_left(a, x, lo, hi) # find insertion position
- return pos if pos != hi and a[pos] == x else -1 # don't walk off the end
- def ceil_to_closest_value(x, values):
- values = sorted(values)
- next_largest = bisect_left(values, x) # binary search
- if next_largest < len(values):
- return values[next_largest]
- else:
- return values[-1] # if there is no larger value use the largest one
- def print_progress_bar(iteration, total, prefix='Progress:', suffix='', decimals=1, length=50, fill='█',
- print_eta=True):
- """
- Call in a loop to create terminal progress bar
- @params:
- iteration - Required : current iteration (Int)
- total - Required : total iterations (Int)
- prefix - Optional : prefix string (Str)
- suffix - Optional : suffix string (Str)
- decimals - Optional : positive number of decimals in percent complete (Int)
- length - Optional : character length of bar (Int)
- fill - Optional : bar fill character (Str)
- """
- percent = ("{0:" + str(4 + decimals) + "." + str(decimals) + "f}").format(100 * (iteration / float(total)))
- filled_length = int(length * iteration // total)
- bar = fill * filled_length + '-' * (length - filled_length)
- if getattr(print_progress_bar, 'last_printed_value', None) == (prefix, bar, percent, suffix):
- return
- print_progress_bar.last_printed_value = (prefix, bar, percent, suffix)
- print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end='')
- # Print New Line on Complete
- if iteration == total:
- print()
- def get_all_subclasses(klass):
- all_subclasses = []
- for subclass in klass.__subclasses__():
- all_subclasses.append(subclass)
- all_subclasses.extend(get_all_subclasses(subclass))
- return all_subclasses
- def my_mac_address():
- """
- https://stackoverflow.com/a/160821
- """
- import uuid
- mac = uuid.getnode()
- if (mac >> 40) % 2:
- return None
- mac = uuid.UUID(int=mac).hex[-12:]
- return mac
- def latin1_json(data):
- return json.dumps(data, ensure_ascii=False).encode('latin-1')
- def l2_norm(v1, v2):
- if len(v1) != len(v2):
- raise ValueError('Both vectors must be of the same size')
- return math.sqrt(sum([(x1 - x2) * (x1 - x2) for x1, x2 in zip(v1, v2)]))
- def allow_additional_unused_keyword_arguments(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- allowed_kwargs = [param.name for param in inspect.signature(func).parameters.values()]
- allowed_kwargs = {a: kwargs[a] for a in kwargs if a in allowed_kwargs}
- return func(*args, **allowed_kwargs)
- return wrapper
- def copy_and_rename_method(func, new_name):
- funcdetails = [
- func.__code__,
- func.__globals__,
- func.__name__,
- func.__defaults__,
- func.__closure__
- ]
- old_name = func.__name__
- # copy
- # new_func = dill.loads(dill.dumps(func))
- new_func = FunctionType(*funcdetails)
- assert new_func is not funcdetails
- # rename
- new_func.__name__ = new_name
- assert func.__name__ is old_name
- return new_func
- def rename(new_name):
- def decorator(f):
- f.__name__ = new_name
- return f
- return decorator
- class LogicError(Exception):
- pass
- def round_time(dt=None, precision=60):
- """Round a datetime object to any time lapse in seconds
- dt : datetime.datetime object, default now.
- roundTo : Closest number of seconds to round to, default 1 minute.
- Author: Thierry Husson 2012 - Use it as you want but don't blame me.
- """
- if dt is None:
- dt = datetime.datetime.now()
- if isinstance(precision, datetime.timedelta):
- precision = precision.total_seconds()
- seconds = (dt.replace(tzinfo=None) - dt.min).seconds
- rounding = (seconds + precision / 2) // precision * precision
- return dt + datetime.timedelta(seconds=rounding - seconds,
- microseconds=dt.microsecond)
- def chunks(lst, n):
- """Yield successive n-sized chunks from lst."""
- for i in range(0, len(lst), n):
- yield lst[i:i + n]
- def shorten_name(name):
- name = re.sub(r'\s+', r' ', str(name))
- name = name.replace(', ', ',')
- name = name.replace(', ', ',')
- name = name.replace(' ', '_')
- return re.sub(r'([A-Za-z])[a-z]*_?', r'\1', str(name))
- def array_analysis(a: numpy.ndarray):
- print(f' Shape: {a.shape}')
- mean = a.mean()
- print(f' Mean: {mean}')
- print(f' Std: {a.std()}')
- print(f' Min, Max: {a.min(), a.max()}')
- print(f' Mean absolute: {numpy.abs(a).mean()}')
- print(f' Mean square: {numpy.square(a).mean()}')
- print(f' Mean absolute difference from mean: {numpy.abs(a - mean).mean()}')
- print(f' Mean squared difference from mean: {numpy.square(a - mean).mean()}')
- nonzero = numpy.count_nonzero(a)
- print(f' Number of non-zeros: {nonzero}')
- print(f' Number of zeros: {numpy.prod(a.shape) - nonzero}')
- if a.shape[-1] > 1 and a.shape[-1] <= 1000:
- # last dim is probably the number of classes
- print(f' Class counts: {numpy.count_nonzero(a, axis=tuple(range(len(a.shape) - 1)))}')
- def current_year_begin():
- return datetime.datetime(datetime.datetime.today().year, 1, 1).timestamp()
- def current_day_begin():
- return datetime.datetime.today().timestamp() // (3600 * 24) * (3600 * 24)
- def current_second_begin():
- return floor(datetime.datetime.today().timestamp())
- def running_workers(executor):
- print(next(iter(executor._threads)).__dict__)
- return sum(1 for t in executor._threads
- if t == 1)
- def queued_calls(executor):
- return len(executor._work_queue.queue)
- def retry_on_error(max_tries=3, delay=0.5, backoff=2, only_error_classes=Exception):
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- for i in range(max_tries):
- try:
- return func(*args, **kwargs)
- except only_error_classes as e:
- if i == max_tries - 1:
- raise
- logging.error(f'Re-try after error in {func.__name__}: {type(e).__name__}, {e}')
- time.sleep(delay * (backoff ** i))
- return wrapper
- return decorator
- class EBC:
- SUBCLASSES_BY_NAME: Dict[str, Type['EBC']] = {}
- def __init_subclass__(cls, **kwargs):
- super().__init_subclass__(**kwargs)
- EBC.SUBCLASSES_BY_NAME[cls.__name__] = cls
- def __eq__(self, other):
- return isinstance(other, type(self)) and self.__dict__ == other.__dict__
- def __str__(self):
- return str(self.__dict__)
- def __repr__(self):
- return f'{type(self).__name__}(**' + str(self.__dict__) + ')'
- def to_json(self) -> Dict[str, Any]:
- result: Dict[str, Any] = {
- 'type': type(self).__name__,
- **self.__dict__,
- }
- for k in result:
- if isinstance(result[k], EBC):
- result[k] = result[k].to_json()
- elif isinstance(result[k], numpy.ndarray):
- result[k] = result[k].tolist()
- elif isinstance(result[k], list):
- result[k] = [r.to_json() if isinstance(r, EBC) else r
- for r in result[k]]
- return result
- @staticmethod
- def from_json(data: Dict[str, Any]):
- cls = EBC.SUBCLASSES_BY_NAME[data['type']]
- return class_from_json(cls, data)
- def class_from_json(cls, data: Dict[str, Any]):
- if isinstance(data, str):
- data = json.loads(data)
- # noinspection PyArgumentList
- try:
- return cls(**data)
- except TypeError as e:
- if "__init__() got an unexpected keyword argument 'type'" in str(e) or 'takes no arguments' in str(e):
- # probably this was from a to_json method
- if data['type'] != cls.__name__:
- t = data['type']
- logging.warning(f'Reconstructing a {cls.__name__} from a dict with type={t}')
- data = data.copy()
- del data['type']
- for k,v in data.items():
- if probably_serialized_from_ebc(v):
- data[k] = EBC.SUBCLASSES_BY_NAME[v['type']].from_json(v)
- elif isinstance(v, list):
- data[k] = [EBC.SUBCLASSES_BY_NAME[x['type']].from_json(x)
- if probably_serialized_from_ebc(x)
- else x
- for x in v]
- return allow_additional_unused_keyword_arguments(cls)(**data)
- else:
- raise
- def probably_serialized_from_ebc(data):
- return isinstance(data, dict) and 'type' in data and data['type'] in EBC.SUBCLASSES_BY_NAME
- class EBE(Enum):
- def __int__(self):
- return self.value
- def __str__(self):
- return self.name
- def __repr__(self):
- return self.name
- @classmethod
- def from_name(cls, variable_name):
- return cls.__dict__[variable_name]
- class Bunch(dict, EBC):
- def __init__(self, **kwargs):
- dict.__init__(self, kwargs)
- self.__dict__.update(kwargs)
- def add_method(self, m):
- setattr(self, m.__name__, functools.partial(m, self))
- def floor_to_multiple_of(x, multiple_of):
- return math.floor(x / multiple_of) * multiple_of
- def round_to_multiple_of(x, multiple_of):
- return round(x / multiple_of) * multiple_of
- def ceil_to_multiple_of(x, multiple_of):
- return math.ceil(x / multiple_of) * multiple_of
|