date_time.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. import cachetools
  2. import datetime
  3. import math
  4. from typing import List, Dict, Any, Type, Optional
  5. from lib.util import EBC
  6. def get_timestamp(start_day, input_format) -> int:
  7. if isinstance(start_day, int):
  8. return start_day
  9. elif isinstance(start_day, str):
  10. return int(datetime.datetime.timestamp(datetime.datetime.strptime(start_day, input_format)))
  11. else:
  12. return int(datetime.datetime.timestamp(start_day))
  13. class Day(EBC):
  14. def __init__(self, dt: datetime.date, day_id: int):
  15. self.day_id = day_id
  16. self.dt = dt
  17. self._ymd = self.dt.strftime('%Y-%m-%d')
  18. @classmethod
  19. def field_types(cls) -> Dict[str, Type]:
  20. result = super().field_types()
  21. result['dt'] = SerializableDateTime
  22. return result
  23. def timestamp(self):
  24. dt = self.dt
  25. if isinstance(dt, datetime.date):
  26. dt = datetime.datetime(dt.year, dt.month, dt.day)
  27. return dt.timestamp()
  28. @staticmethod
  29. def from_ts(ts: int, day_id: int):
  30. return Day(
  31. day_id=day_id,
  32. dt=datetime.date.fromtimestamp(ts)
  33. )
  34. def day_idx(self):
  35. return self.day_id - 1
  36. def datetime_from_time_string(self, time_string: str):
  37. return self.datetime_from_date_and_time_string(self.dt, time_string)
  38. @staticmethod
  39. def datetime_from_date_and_time_string(date: datetime.date, time_string: str):
  40. if len(time_string) == 5:
  41. time_string += ':00'
  42. return datetime.datetime.strptime(f'{date.strftime("%Y-%m-%d")} {time_string}', '%Y-%m-%d %H:%M:%S')
  43. def ymd(self) -> str:
  44. return self._ymd
  45. def weekday(self):
  46. return self.dt.weekday()
  47. def weekday_as_german_string(self):
  48. return {
  49. 'Monday': 'Montag',
  50. 'Tuesday': 'Dienstag',
  51. 'Wednesday': 'Mittwoch',
  52. 'Thursday': 'Donnerstag',
  53. 'Friday': 'Freitag',
  54. 'Saturday': 'Samstag',
  55. 'Sunday': 'Sonntag',
  56. }[self.dt.strftime('%A')]
  57. def __eq__(self, other):
  58. if not isinstance(other, type(self)):
  59. raise ValueError
  60. return self.day_id == other.day_id and self._ymd == other._ymd
  61. @cachetools.cached(cachetools.LRUCache(maxsize=128))
  62. def time_interval_as_days(start_day, day_count, inputformat='%Y-%m-%d') -> List[Day]:
  63. '''
  64. :param start_day: date
  65. :param day_count:
  66. :return: list of dates as string
  67. '''
  68. start_day = get_timestamp(start_day, inputformat)
  69. results = [
  70. Day.from_ts(start_day + 86400 * day_idx, day_id=day_idx + 1)
  71. for day_idx in range(day_count)
  72. ]
  73. return results
  74. def weekday_name(index: int, lan='english'):
  75. """
  76. :param index: 0..6
  77. :return: The weekday as name
  78. """
  79. if lan == 'english':
  80. weekdays = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
  81. else:
  82. weekdays = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag', 'Freitag', 'Samstag', 'Sonntag']
  83. return weekdays[index]
  84. def get_calendarweek_from_datetime(datetime_obj):
  85. return datetime_obj.strftime('%V')
  86. def timestamp_from_time_of_day_string(time_string):
  87. ts = datetime.datetime.timestamp(datetime.datetime.strptime('1970-' + time_string + ' +0000', '%Y-%H:%M %z'))
  88. assert 0 <= ts <= 24 * 60 * 60
  89. return ts
  90. class SerializableDateTime(datetime.datetime, EBC):
  91. @classmethod
  92. def field_types(cls) -> Dict[str, Type]:
  93. return {
  94. 'year': int,
  95. 'month': int,
  96. 'day': int,
  97. 'hour': int,
  98. 'minute': int,
  99. 'second': int,
  100. 'microsecond': int,
  101. 'tzinfo': type(None),
  102. }
  103. def to_json(self) -> Dict[str, Any]:
  104. return {
  105. 'type': type(self).__name__,
  106. 'dt_str': self.strftime('%Y-%m-%d %H:%M:%S.%f'),
  107. }
  108. def __hash__(self):
  109. return super().__hash__()
  110. @staticmethod
  111. def from_datetime(dt):
  112. return SerializableDateTime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)
  113. @staticmethod
  114. def from_date(dt):
  115. if isinstance(dt, datetime.datetime):
  116. return SerializableDateTime.from_datetime(dt)
  117. return SerializableDateTime(dt.year, dt.month, dt.day)
  118. @staticmethod
  119. def from_json(data: Dict[str, Any]):
  120. cls: Type[SerializableDateTime] = EBC.SUBCLASSES_BY_NAME[data['type']]
  121. if 'dt_str' in data:
  122. dt = datetime.datetime.strptime(data['dt_str'], '%Y-%m-%d %H:%M:%S.%f')
  123. data['year'] = dt.year
  124. data['month'] = dt.month
  125. data['day'] = dt.day
  126. data['hour'] = dt.hour
  127. data['minute'] = dt.minute
  128. data['second'] = dt.second
  129. data['microsecond'] = dt.microsecond
  130. return cls(data['year'], data['month'], data['day'], data['hour'], data['minute'], data['second'], data['microsecond'], data.get('tzinfo', None))
  131. def __eq__(self, other):
  132. if isinstance(other, datetime.date) and not isinstance(other, datetime.datetime):
  133. return self.date() == other
  134. return (
  135. self.year == other.year
  136. and self.month == other.month
  137. and self.day == other.day
  138. and self.hour == other.hour
  139. and self.minute == other.minute
  140. and self.second == other.second
  141. and self.microsecond == other.microsecond
  142. and self.tzinfo == other.tzinfo
  143. )
  144. def __ne__(self, other):
  145. return not self.__eq__(other)
  146. class SerializableTimeDelta(datetime.timedelta, EBC):
  147. max: 'SerializableTimeDelta'
  148. @classmethod
  149. def field_types(cls) -> Dict[str, Type]:
  150. return {
  151. 'seconds': float,
  152. 'days': Optional[int],
  153. 'microseconds': Optional[int],
  154. }
  155. def to_json(self) -> Dict[str, Any]:
  156. return {
  157. 'type': type(self).__name__,
  158. 'seconds': self.seconds,
  159. 'days': self.days,
  160. 'microseconds': self.microseconds,
  161. }
  162. @classmethod
  163. def from_total_seconds(cls, total_seconds) -> 'SerializableTimeDelta':
  164. try:
  165. return SerializableTimeDelta(seconds=total_seconds)
  166. except OverflowError as e:
  167. if 'must have magnitude <=' in str(e):
  168. return cls.max
  169. def positive_infinite(self):
  170. return self.days == self.max.days and self.seconds == self.max.seconds and self.microseconds == self.max.microseconds
  171. def total_seconds(self) -> float:
  172. if self.positive_infinite():
  173. return math.inf
  174. return super().total_seconds()
  175. @classmethod
  176. def from_timedelta(cls, timedelta: datetime.timedelta):
  177. return cls(timedelta.days, timedelta.seconds, timedelta.microseconds)
  178. @staticmethod
  179. def from_json(data: Dict[str, Any]):
  180. cls: Type[SerializableTimeDelta] = EBC.SUBCLASSES_BY_NAME[data['type']]
  181. return cls(data.get('days', 0.), data['seconds'], data.get('microseconds', 0.))
  182. def __eq__(self, other):
  183. return self.total_seconds() == other.total_seconds()
  184. SerializableTimeDelta.max = SerializableTimeDelta.from_timedelta(datetime.timedelta.max)
  185. def minutes_timestamp_from_time_of_day_string(time_of_day_string):
  186. timestamp = timestamp_from_time_of_day_string(time_of_day_string)
  187. return minutes_timestamp_from_seconds(timestamp)
  188. def minutes_timestamp_from_seconds(timestamp):
  189. return timestamp / 60