123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- import faulthandler
- import functools
- import inspect
- import json
- import os
- import random
- import time
- from bisect import bisect_left
- from datetime import datetime, timedelta
- from math import floor, log10, sqrt, nan, inf
- import scipy.stats
- import tabulate
- from numpy.random.mtrand import binomial
- from lib import stack_tracer
- from lib.print_exc_plus import print_exc_plus
- chars = [str(d) for d in range(1, 10)]
- digits = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- ps = [1. for _ in chars]
- # the first number is absolute and the second relative
- letter_dist = [("E", 21912, 12.02), ("T", 16587, 9.1), ("A", 14810, 8.12), ("O", 14003, 7.68), ("I", 13318, 7.31),
- ("N", 12666, 6.95), ("S", 11450, 6.28), ("R", 10977, 6.02), ("H", 10795, 5.92), ("D", 7874, 4.32),
- ("L", 7253, 3.98), ("U", 5246, 2.88), ("C", 4943, 2.71), ("M", 4761, 2.61), ("F", 4200, 2.3),
- ("Y", 3853, 2.11), ("W", 3819, 2.09), ("G", 3693, 2.03), ("P", 3316, 1.82), ("B", 2715, 1.49),
- ("V", 2019, 1.11), ("K", 1257, 0.69), ("X", 315, 0.17), ("Q", 205, 0.11), ("J", 188, 0.1),
- ("Z", 128, 0.07), ]
- sp = sum(ps)
- for row in letter_dist:
- chars.append(row[0])
- ps.append(float(row[2]))
- ps = [p / sum(ps) for p in ps]
- def choice(sequence, probabilities):
- # if sum(probabilities) != 1:
- # raise AssertionError('Probabilities must sum to 1')
- r = random.random()
- for idx, c in enumerate(sequence):
- r -= probabilities[idx]
- if r < 0:
- return c
- raise AssertionError('Probabilities must sum to 1')
- def multiple_choice(sequence, count):
- results = []
- num_remaining = len(sequence)
- for _ in range(count):
- idx = random.randrange(num_remaining)
- results.append(sequence[idx])
- del sequence[idx]
- num_remaining -= 1
- return results
- try:
- import winsound as win_sound
- def beep(*args, **kwargs):
- win_sound.Beep(*args, **kwargs)
- except ImportError:
- win_sound = None
- def beep(*_args, **_kwargs):
- pass
- def main_wrapper(f):
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- start = time.perf_counter()
- # import lib.stack_tracer
- import __main__
- # does not help much
- # monitoring_thread = hanging_threads.start_monitoring(seconds_frozen=180, test_interval=1000)
- os.makedirs('logs', exist_ok=True)
- stack_tracer.trace_start('logs/' + os.path.split(__main__.__file__)[-1] + '.html', interval=5)
- faulthandler.enable()
- profile_wall_time_instead_if_profiling()
- # noinspection PyBroadException
- try:
- f(*args, **kwargs)
- except Exception:
- print_exc_plus()
- exit(-1)
- finally:
- total_time = time.perf_counter() - start
- frequency = 2000
- duration = 500
- beep(frequency, duration)
- print('Total time', total_time)
- return wrapper
- def random_chars(count):
- return ''.join(choice(chars, probabilities=ps) for _ in range(count))
- def str2bool(v):
- v = str(v).strip().lower()
- if v in ["yes", 'y' "true", "t", "1"]:
- return True
- if v in ["no", 'n' "false", "f", "0", '', 'null', 'none']:
- return False
- raise ValueError('Can not convert `' + v + '` to bool')
- def my_tabulate(data, **params):
- if data == [] and 'headers' in params:
- data = [(None for _ in params['headers'])]
- tabulate.MIN_PADDING = 0
- return tabulate.tabulate(data, **params)
- def yn_dialog(msg):
- while True:
- result = input(msg + ' [y/n]: ')
- if result == 'y':
- return True
- if result == 'n':
- return False
- print('Type in \'y\' or \'n\'!')
- def round_to_closest_value(x, values):
- values = sorted(values)
- next_largest = bisect_left(values, x) # binary search
- if next_largest == 0:
- return values[0]
- if next_largest == len(values):
- return values[-1]
- next_smallest = next_largest - 1
- smaller = values[next_smallest]
- larger = values[next_largest]
- if abs(smaller - x) < abs(larger - x):
- return smaller
- else:
- return larger
- def binary_search(a, x, lo=0, hi=None):
- hi = hi if hi is not None else len(a) # hi defaults to len(a)
- pos = bisect_left(a, x, lo, hi) # find insertion position
- return pos if pos != hi and a[pos] == x else -1 # don't walk off the end
- def ceil_to_closest_value(x, values):
- values = sorted(values)
- next_largest = bisect_left(values, x) # binary search
- if next_largest < len(values):
- return values[next_largest]
- else:
- return values[-1] # if there is no larger value use the largest one
- def upset_binomial(mu, p, factor):
- if factor > 1:
- raise NotImplementedError()
- return (binomial(mu / p, p) - mu) * factor + mu
- def multinomial(n, bins):
- if bins == 0:
- if n > 0:
- raise ValueError('Cannot distribute to 0 bins.')
- return []
- remaining = n
- results = []
- for i in range(bins - 1):
- x = binomial(remaining, 1 / (bins - i))
- results.append(x)
- remaining -= x
- results.append(remaining)
- return results
- def round_to_n(x, n):
- return round(x, -int(floor(log10(x))) + (n - 1))
- def get_all_subclasses(klass):
- all_subclasses = []
- for subclass in klass.__subclasses__():
- all_subclasses.append(subclass)
- all_subclasses.extend(get_all_subclasses(subclass))
- return all_subclasses
- def latin1_json(data):
- return json.dumps(data, ensure_ascii=False).encode('latin-1')
- def l2_norm(v1, v2):
- if len(v1) != len(v2):
- raise ValueError('Both vectors must be of the same size')
- return sqrt(sum([(x1 - x2) * (x1 - x2) for x1, x2 in zip(v1, v2)]))
- def allow_additional_unused_keyword_arguments(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- allowed_kwargs = [param.name for param in inspect.signature(func).parameters.values()]
- allowed_kwargs = {a: kwargs[a] for a in kwargs if a in allowed_kwargs}
- return func(*args, **allowed_kwargs)
- return wrapper
- def rename(new_name):
- def decorator(f):
- f.__name__ = new_name
- return f
- return decorator
- def mean_confidence_interval_size(data, confidence=0.95):
- if len(data) == 0:
- return nan
- if len(data) == 1:
- return inf
- if scipy.stats.sem(data) == 0:
- return 0
- return len(data) / sum(data) - scipy.stats.t.interval(confidence, len(data) - 1,
- loc=len(data) / sum(data),
- scale=scipy.stats.sem(data))[0]
- class LogicError(Exception):
- pass
- def round_time(dt=None, precision=60):
- """Round a datetime object to any time lapse in seconds
- dt : datetime.datetime object, default now.
- roundTo : Closest number of seconds to round to, default 1 minute.
- Author: Thierry Husson 2012 - Use it as you want but don't blame me.
- """
- if dt is None:
- dt = datetime.now()
- if isinstance(precision, timedelta):
- precision = precision.total_seconds()
- seconds = (dt.replace(tzinfo=None) - dt.min).seconds
- rounding = (seconds + precision / 2) // precision * precision
- return dt + timedelta(seconds=rounding - seconds,
- microseconds=dt.microsecond)
- def profile_wall_time_instead_if_profiling():
- try:
- import yappi
- except ModuleNotFoundError:
- return
- currently_profiling = len(yappi.get_func_stats())
- if currently_profiling and yappi.get_clock_type() != 'wall':
- print('Changing yappi clock type to wall and restarting yappi.')
- yappi.stop()
- yappi.clear_stats()
- yappi.set_clock_type("wall")
- yappi.start()
- def dummy_computation(_data):
- return
- def current_year_begin():
- return datetime(datetime.today().year, 1, 1).timestamp()
- def current_day_begin():
- return datetime.today().timestamp() // (3600 * 24) * (3600 * 24)
- def current_second_begin():
- return floor(datetime.today().timestamp())
|