123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- import math
- import os
- from typing import List, Union
- import numpy
- import pandas
- from pandas import DataFrame
- from tool_lib.tuned_pandasql import TunedPandaSQL
- from tool_lib.util import my_tabulate, EBC, EBE
- class Category(EBE):
- UNKNOWN = 'UNKNOWN'
- OPERATIVE = 'OPERATIVE'
- MEETING = 'MEETING'
- PLANNING = 'PLANNING'
- STRATEGY = 'STRATEGY'
- CONTROLLING = 'CONTROLLING'
- AUTOMATION = 'AUTOMATION'
- WASTED_TIME = 'WASTED_TIME'
- NOT_AVAILABLE = 'NOT_AVAILABLE'
- @classmethod
- def determine_category(cls, task: Union[str, float]) -> 'Category':
- task = str(task)
- if task == 'nan':
- return cls.NOT_AVAILABLE
- elif task == 'root':
- return cls.NOT_AVAILABLE
- elif 'meeting' in task.lower():
- return cls.MEETING
- elif 'eckard' in task.lower() or 'eckhard' in task.lower():
- return cls.MEETING
- elif 'unternehmensberatung' in task.lower():
- return cls.MEETING
- elif 'wirschaftsbüro' in task.lower() or 'wirtschaftsbüro' in task.lower():
- return cls.MEETING
- elif 'controlling' in task.lower():
- return cls.CONTROLLING
- elif 'strateg' in task.lower():
- return cls.STRATEGY
- elif 'arbeitserfassungstool' in task.lower():
- return cls.AUTOMATION
- elif 'worktime table' in task.lower():
- return cls.AUTOMATION
- elif 'automat' in task.lower():
- return cls.AUTOMATION
- elif 'template' in task.lower():
- return cls.AUTOMATION
- elif 'rechnung' in task.lower():
- return cls.AUTOMATION
- elif 'operativ' in task.lower():
- return cls.OPERATIVE
- elif 'website' in task.lower():
- return cls.OPERATIVE
- elif 'bunq' in task.lower():
- return cls.WASTED_TIME
- elif task == 'root/Master-Arbeitsstunden ':
- return cls.OPERATIVE
- elif 'planungsassistent' in task.lower():
- return cls.OPERATIVE
- elif 'telefonat' in task.lower():
- return cls.OPERATIVE
- elif 'umfrage' in task.lower():
- return cls.OPERATIVE
- elif 'code cleaning' in task.lower():
- return cls.OPERATIVE
- elif 'economy_calendar' in task.lower() or 'economy calendar' in task.lower():
- return cls.OPERATIVE
- elif 'tradingbot' in task.lower() or 'trading bot' in task.lower():
- return cls.OPERATIVE
- elif 'optiwae' in task.lower():
- return cls.OPERATIVE
- elif 'twilio' in task.lower():
- return cls.OPERATIVE
- elif 'linkguide' in task.lower():
- return cls.OPERATIVE
- elif 'plan' in task.lower():
- return cls.PLANNING
- elif 'tools' in task.lower():
- return cls.AUTOMATION
- else:
- return cls.UNKNOWN
- unknown_categories = set()
- def category_from_row(row):
- result = Category.determine_category(row['Task']).value
- if result == Category.UNKNOWN.value:
- unknown_categories.add(row['Task'])
- return result
- def load_from_disk():
- dataframes = []
- this_dir = os.path.dirname(__file__)
- for xlsx_path in os.listdir(os.path.join(this_dir, 'time_recorded_tables')):
- if not xlsx_path.endswith('.xlsx'):
- continue
- if xlsx_path.startswith('~$'):
- continue
- if 'example' in xlsx_path:
- continue
- dataframes.append(pandas.read_excel(os.path.join(this_dir, 'time_recorded_tables', xlsx_path)))
- result = pandas.concat(dataframes)
- result['Datum'] = pandas.to_datetime(result['Datum'])
- result['Kalenderwoche'] = result['Datum'].dt.strftime('%V')
- result.sort_values(by=['Datum'], inplace=True)
- return result
- def get_date(row):
- if pandas.isnull(row['Datum']):
- return 'NULL'
- return row['Datum'].strftime('%d.%m.%Y')
- def get_name(row):
- return row['Name']
- def get_year(row):
- if pandas.isnull(row['Datum']):
- return 'NULL'
- return row['Datum'].strftime('%Y')
- def get_month(row):
- if pandas.isnull(row['Datum']):
- return 'NULL'
- return row['Datum'].strftime('%m.%Y')
- def get_task(row):
- return row['Task']
- def get_work_time(row):
- return row['Arbeitszeit']
- def get_work_time_minutes(row):
- return row['Arbeitszeit in Min']
- def get_income(row):
- return row['Einkommen']
- def get_overtime(row):
- return row['Überstunden']
- def raw_df():
- MAX_TASK_PARTS = 4
- def get_calendar_week(row):
- return row['Kalenderwoche']
- columns = {
- 'Name': get_name,
- 'Datum': get_date,
- 'Monat': get_month,
- 'Jahr': get_year,
- 'Task': get_task,
- 'Kalenderwoche': get_calendar_week,
- 'Arbeitszeit': get_work_time,
- 'Arbeitszeit in Minuten': get_work_time_minutes,
- 'Einkommen': get_income,
- 'Überstunden': get_overtime,
- 'Kategorie': category_from_row,
- **{
- f'Task Präfix {i}': lambda row, i=i: get_task_prefix(row, i)
- for i in range(1, MAX_TASK_PARTS)
- }
- }
- combinations: List[tuple] = [
- ('Name', 'Jahr'),
- # ('Name', 'Task'),
- # ('Name', 'Task Präfix 1'),
- ('Name', 'Task Präfix 2'),
- # ('Name', 'Task Präfix 3'),
- ]
- for combination in combinations:
- new_column_name = '_by_'.join(combination)
- columns[new_column_name] = lambda *args, combination=combination, **kwargs: ', '.join(str(columns[column](*args, **kwargs)) for column in combination)
- df = load_from_disk()
- table = [
- [columns[column](row) for column in columns]
- for row in df.to_dict('records')
- ]
- return DataFrame(table, columns=list(columns))
- def get_task_prefix(row, prefix_length):
- return "/".join(str(row['Task']).split('/')[:prefix_length])
- def preprocess_for_sql(x):
- if isinstance(x, int):
- return str(x)
- elif isinstance(x, str):
- x = x.replace("'", r"\'")
- return f"'{x}'"
- elif isinstance(x, float):
- return str(x)
- elif numpy.isscalar(x):
- return str(x)
- elif x is None:
- return 'NULL'
- else:
- raise NotImplementedError(type(x))
- def first_column_values(values):
- return list(values[values.columns[0]])
- def compute_rows(max_distinct_values=120, ):
- df = raw_df()
- result = []
- if df.shape[0] == 0:
- return result
- relevant_columns = [column for column in df.columns
- if not column.startswith('_')]
- distinct_column_values = TunedPandaSQL().multiquery(
- {column: f'SELECT DISTINCT "{column}" FROM df' for column in df.columns},
- {'df': df[relevant_columns]}
- )
- resultss = TunedPandaSQL().multiquery(
- {
- (column, v): f'SELECT Arbeitszeit, "Arbeitszeit in Minuten", Einkommen, Überstunden '
- f'FROM df WHERE "{column}" IS {v}'
- for column in relevant_columns
- for column_value in first_column_values(distinct_column_values[column])
- if len(distinct_column_values[column]) <= max_distinct_values
- for v in [preprocess_for_sql(column_value)]
- if v != 'nan'
- },
- {'df': df[relevant_columns]}
- )
- for column in relevant_columns:
- values: DataFrame = distinct_column_values[column]
- if len(values) > max_distinct_values:
- print(f'{len(values)} distinct values in column {column}')
- for (column, v), results in resultss.items():
- if column in ['Arbeitszeit', 'Arbeitszeit in Minuten', 'Einkommen', 'Überstunden']:
- continue
- new_row = [
- column,
- v,
- results['Arbeitszeit'].sum(),
- results['Arbeitszeit in Minuten'].sum(),
- results['Einkommen'].sum(),
- results['Überstunden'].sum(),
- ]
- result.append(new_row)
- return result, ['Key', 'Value', 'Arbeitszeit', 'Arbeitszeit in Minuten', 'Einkommen', 'Überstunden']
- rows, columns = compute_rows()
- rows = sorted(rows, key=lambda row: (row[0], row[1]))
- print(my_tabulate(data=rows, headers=columns))
- print()
- print('Unspecified categories:')
- for c in unknown_categories:
- print(' -', repr(c))
|