import math import os from typing import List, Union import numpy import pandas from pandas import DataFrame from tool_lib.tuned_pandasql import TunedPandaSQL from tool_lib.util import my_tabulate, EBC, EBE class Category(EBE): UNKNOWN = 'UNKNOWN' OPERATIVE = 'OPERATIVE' MEETING = 'MEETING' PLANNING = 'PLANNING' STRATEGY = 'STRATEGY' CONTROLLING = 'CONTROLLING' AUTOMATION = 'AUTOMATION' WASTED_TIME = 'WASTED_TIME' NOT_AVAILABLE = 'NOT_AVAILABLE' @classmethod def determine_category(cls, task: Union[str, float]) -> 'Category': task = str(task) if task == 'nan': return cls.NOT_AVAILABLE elif task == 'root': return cls.NOT_AVAILABLE elif 'meeting' in task.lower(): return cls.MEETING elif 'eckard' in task.lower() or 'eckhard' in task.lower(): return cls.MEETING elif 'unternehmensberatung' in task.lower(): return cls.MEETING elif 'Notar' in task: return cls.MEETING elif 'wirschaftsbüro' in task.lower() or 'wirtschaftsbüro' in task.lower(): return cls.MEETING elif 'controlling' in task.lower(): return cls.CONTROLLING elif 'strateg' in task.lower(): return cls.STRATEGY elif 'arbeitserfassungstool' in task.lower(): return cls.AUTOMATION elif 'worktime table' in task.lower(): return cls.AUTOMATION elif 'automat' in task.lower(): return cls.AUTOMATION elif 'template' in task.lower(): return cls.AUTOMATION elif 'rechnung' in task.lower(): return cls.AUTOMATION elif 'operativ' in task.lower(): return cls.OPERATIVE elif 'website' in task.lower(): return cls.OPERATIVE elif 'bunq' in task.lower(): return cls.WASTED_TIME elif task == 'root/Master-Arbeitsstunden ': return cls.OPERATIVE elif 'planungsassistent' in task.lower(): return cls.OPERATIVE elif 'telefonat' in task.lower(): return cls.OPERATIVE elif 'umfrage' in task.lower(): return cls.OPERATIVE elif 'code cleaning' in task.lower(): return cls.OPERATIVE elif 'economy_calendar' in task.lower() or 'economy calendar' in task.lower(): return cls.OPERATIVE elif 'tradingbot' in task.lower() or 'trading bot' in task.lower(): return cls.OPERATIVE elif 'optiwae' in task.lower(): return cls.OPERATIVE elif 'twilio' in task.lower(): return cls.OPERATIVE elif 'linkguide' in task.lower(): return cls.OPERATIVE elif 'plan' in task.lower(): return cls.PLANNING elif 'tools' in task.lower(): return cls.AUTOMATION else: return cls.UNKNOWN unknown_categories = set() def category_from_row(row): result = Category.determine_category(row['Task']).value if result == Category.UNKNOWN.value: unknown_categories.add(row['Task']) return result def load_from_disk(): dataframes = [] this_dir = os.path.dirname(__file__) for xlsx_path in os.listdir(os.path.join(this_dir, 'time_recorded_tables')): if not xlsx_path.endswith('.xlsx'): continue if xlsx_path.startswith('~$'): continue if 'example' in xlsx_path: continue dataframes.append(pandas.read_excel(os.path.join(this_dir, 'time_recorded_tables', xlsx_path))) result = pandas.concat(dataframes) result['Datum'] = pandas.to_datetime(result['Datum']) result['Kalenderwoche'] = result['Datum'].dt.strftime('%V') result.sort_values(by=['Datum'], inplace=True) return result def get_date(row): if pandas.isnull(row['Datum']): return 'NULL' return row['Datum'].strftime('%d.%m.%Y') def get_name(row): return row['Name'] def get_year(row): if pandas.isnull(row['Datum']): return 'NULL' return row['Datum'].strftime('%Y') def get_month(row): if pandas.isnull(row['Datum']): return 'NULL' return row['Datum'].strftime('%m.%Y') def get_task(row): return row['Task'] def get_work_time(row): return row['Arbeitszeit'] def get_work_time_minutes(row): return row['Arbeitszeit in Min'] def get_income(row): return row['Einkommen'] def get_overtime(row): return row['Überstunden'] def raw_df(): MAX_TASK_PARTS = 4 def get_calendar_week(row): return row['Kalenderwoche'] columns = { 'Name': get_name, 'Datum': get_date, 'Monat': get_month, 'Jahr': get_year, 'Task': get_task, 'Kalenderwoche': get_calendar_week, 'Arbeitszeit': get_work_time, 'Arbeitszeit in Minuten': get_work_time_minutes, 'Einkommen': get_income, 'Überstunden': get_overtime, 'Kategorie': category_from_row, **{ f'Task Präfix {i}': lambda row, i=i: get_task_prefix(row, i) for i in range(1, MAX_TASK_PARTS) } } combinations: List[tuple] = [ ('Name', 'Jahr'), # ('Name', 'Task'), # ('Name', 'Task Präfix 1'), ('Name', 'Task Präfix 2'), # ('Name', 'Task Präfix 3'), ] for combination in combinations: new_column_name = '_by_'.join(combination) columns[new_column_name] = lambda *args, combination=combination, **kwargs: ', '.join(str(columns[column](*args, **kwargs)) for column in combination) df = load_from_disk() table = [ [columns[column](row) for column in columns] for row in df.to_dict('records') ] return DataFrame(table, columns=list(columns)) def get_task_prefix(row, prefix_length): return "/".join(str(row['Task']).split('/')[:prefix_length]) def preprocess_for_sql(x): if isinstance(x, int): return str(x) elif isinstance(x, str): x = x.replace("'", r"\'") return f"'{x}'" elif isinstance(x, float): return str(x) elif numpy.isscalar(x): return str(x) elif x is None: return 'NULL' else: raise NotImplementedError(type(x)) def first_column_values(values): return list(values[values.columns[0]]) def compute_rows(max_distinct_values=120, ): df = raw_df() result = [] if df.shape[0] == 0: return result relevant_columns = [column for column in df.columns if not column.startswith('_')] distinct_column_values = TunedPandaSQL().multiquery( {column: f'SELECT DISTINCT "{column}" FROM df' for column in df.columns}, {'df': df[relevant_columns]} ) resultss = TunedPandaSQL().multiquery( { (column, v): f'SELECT Arbeitszeit, "Arbeitszeit in Minuten", Einkommen, Überstunden ' f'FROM df WHERE "{column}" IS {v}' for column in relevant_columns for column_value in first_column_values(distinct_column_values[column]) if len(distinct_column_values[column]) <= max_distinct_values for v in [preprocess_for_sql(column_value)] if v != 'nan' }, {'df': df[relevant_columns]} ) for column in relevant_columns: values: DataFrame = distinct_column_values[column] if len(values) > max_distinct_values: print(f'{len(values)} distinct values in column {column}') for (column, v), results in resultss.items(): if column in ['Arbeitszeit', 'Arbeitszeit in Minuten', 'Einkommen', 'Überstunden']: continue new_row = [ column, v, results['Arbeitszeit'].sum(), results['Arbeitszeit in Minuten'].sum(), results['Einkommen'].sum(), results['Überstunden'].sum(), ] result.append(new_row) return result, ['Key', 'Value', 'Arbeitszeit', 'Arbeitszeit in Minuten', 'Einkommen', 'Überstunden'] rows, columns = compute_rows() rows = sorted(rows, key=lambda row: (row[0], row[1])) print(my_tabulate(data=rows, headers=columns)) print() print('Unspecified categories:') for c in unknown_categories: print(' -', repr(c))