run_server.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import datetime
  2. import errno
  3. import json
  4. import os
  5. import random
  6. import re
  7. import sys
  8. import time
  9. from json import JSONDecodeError
  10. from logging import INFO
  11. from threading import Thread
  12. from typing import Dict, Any
  13. import bottle
  14. # noinspection PyUnresolvedReferences
  15. from bottle.ext.websocket import GeventWebSocketServer
  16. # noinspection PyUnresolvedReferences
  17. from bottle.ext.websocket import websocket
  18. from gevent import threading
  19. from gevent.queue import Queue, Empty
  20. from gevent.threading import Lock
  21. from geventwebsocket import WebSocketError
  22. from geventwebsocket.websocket import WebSocket
  23. import connection
  24. import model
  25. import server_controller
  26. from connection import HttpError
  27. from debug import debug
  28. from game import ROOT_URL, COPYRIGHT_INFRINGEMENT_PROBABILITY, DB_NAME, logger
  29. from lib.print_exc_plus import print_exc_plus
  30. from lib.threading_timer_decorator import exit_after
  31. from routes import valid_post_routes, upload_filtered
  32. from util import round_to_n, rename, profile_wall_time_instead_if_profiling
  33. FRONTEND_RELATIVE_PATH = './frontend'
  34. profile_wall_time_instead_if_profiling()
  35. request_lock = Lock() # locked until the response to the request is computed
  36. db_commit_threads = Queue()
  37. if debug:
  38. TIMEOUT = 600
  39. else:
  40. TIMEOUT = 10
  41. assert all(getattr(server_controller, route) for route in valid_post_routes)
  42. def reset_global_variables():
  43. model.current_connection = None
  44. model.current_cursor = None
  45. model.current_db_name = None
  46. model.current_user_id = None
  47. del connection.push_message_queue[:]
  48. bottle.response.status = 500
  49. @exit_after(TIMEOUT)
  50. def call_controller_method_with_timeout(method, json_request: Dict[str, Any]):
  51. return method(json_request)
  52. def _process(path, json_request):
  53. start = time.clock()
  54. path = path.strip().lower()
  55. bottle.response.content_type = 'application/json; charset=latin-1'
  56. reset_global_variables()
  57. original_request = None
  58. # noinspection PyBroadException
  59. try:
  60. json_request = json_request()
  61. original_request = json_request
  62. logger.log(path, INFO, message_type='handling_http_request', data=json.dumps({
  63. 'request': json_request,
  64. 'start': start,
  65. }))
  66. if json_request is None:
  67. bottle.response.status = 400
  68. resp = connection.BadRequest('Only json allowed.')
  69. elif path not in valid_post_routes and not debug:
  70. print('Processing time:', time.clock() - start)
  71. resp = connection.NotFound('URL not available')
  72. else:
  73. model.connect(DB_NAME, create_if_not_exists=True)
  74. method_to_call = getattr(server_controller, path)
  75. try:
  76. server_controller._before_request(json_request)
  77. resp = call_controller_method_with_timeout(method_to_call, json_request)
  78. if isinstance(resp, HttpError):
  79. raise resp
  80. raise connection.Success(resp)
  81. except HttpError as e:
  82. bottle.response.status = e.code
  83. resp = e
  84. if not isinstance(resp.body, dict):
  85. raise TypeError('The response body should always be a dict')
  86. if resp.code // 100 == 2 and path in upload_filtered and random.random() < COPYRIGHT_INFRINGEMENT_PROBABILITY:
  87. resp = connection.UnavailableForLegalReasons('An upload filter detected a copyright infringement. '
  88. 'If you think this is an error, please try again.')
  89. bottle.response.status = resp.code
  90. if model.current_connection is not None:
  91. if bottle.response.status_code == 200:
  92. thread = Thread(target=finish_request, args=[], kwargs={'success': True}, daemon=False)
  93. else:
  94. thread = Thread(target=finish_request, args=[], kwargs={'success': False}, daemon=False)
  95. db_commit_threads.put(thread)
  96. thread.start()
  97. print('route=' + path, 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
  98. 'db=' + str(model.current_db_name))
  99. logger.log(path, INFO, message_type='http_request_finished', data=json.dumps({
  100. 'request': json_request,
  101. 'response': resp.body,
  102. 'status': resp.code,
  103. 'start': start,
  104. 'end': time.clock(),
  105. }))
  106. return resp.body
  107. except JSONDecodeError:
  108. return handle_error('Unable to decode JSON', path, start, original_request)
  109. except NotImplementedError:
  110. return handle_error('This feature has not been fully implemented yet.', path, start, original_request)
  111. except KeyboardInterrupt:
  112. if time.clock() - start > TIMEOUT:
  113. return handle_error('Processing timeout', path, start, original_request)
  114. else:
  115. raise
  116. except Exception:
  117. return handle_error('Unknown error', path, start, original_request)
  118. def finish_request(success):
  119. if success:
  120. model.current_connection.commit()
  121. connection.push_messages_in_queue()
  122. else:
  123. model.current_connection.rollback()
  124. if __name__ == '__main__':
  125. print('sqlite3.version', model.db.version)
  126. if debug:
  127. print('Running server in debug mode...')
  128. print('Preparing backend API...')
  129. @bottle.route('/json/<path>', method='POST')
  130. def process(path):
  131. with request_lock:
  132. wait_for_db_commit_threads()
  133. return _process(path, lambda: bottle.request.json)
  134. def wait_for_db_commit_threads():
  135. while len(db_commit_threads) > 0:
  136. try:
  137. t = db_commit_threads.get()
  138. except Empty:
  139. break
  140. t.join()
  141. print('Preparing index page...')
  142. @bottle.route('/', method='GET')
  143. def index():
  144. if ROOT_URL != '/':
  145. bottle.redirect(ROOT_URL)
  146. def handle_error(message, path, start, request, status=500):
  147. bottle.response.status = status
  148. print_exc_plus()
  149. if model.current_connection is not None:
  150. model.current_connection.rollback()
  151. print('route=' + str(path), 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
  152. 'db=' + str(model.current_db_name))
  153. logger.exception(path, message_type='http_request', data=json.dumps({
  154. 'status': status,
  155. 'start': start,
  156. 'end': time.clock(),
  157. 'exception': str(sys.exc_info()),
  158. 'request': request,
  159. }))
  160. return connection.InternalServerError(message).body
  161. print('Preparing websocket connections...')
  162. @bottle.get('/websocket', apply=[websocket])
  163. def websocket(ws: WebSocket):
  164. print('websocket connection', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  165. while True:
  166. start = time.clock()
  167. path = None
  168. request_token = None
  169. outer_json = None
  170. # noinspection PyBroadException
  171. try:
  172. if ws.closed:
  173. connection.ws_cleanup(ws)
  174. break
  175. try:
  176. msg = ws.read_message()
  177. except ConnectionResetError:
  178. msg = None
  179. except WebSocketError as e:
  180. if e.args[0] == 'Unexpected EOF while decoding header':
  181. msg = None
  182. else:
  183. raise
  184. if msg is not None: # received some message
  185. with request_lock:
  186. wait_for_db_commit_threads()
  187. msg = bytes(msg)
  188. outer_json = None
  189. outer_json = bottle.json_loads(msg)
  190. path = outer_json['route']
  191. inner_json = outer_json['body']
  192. request_token = outer_json['request_token']
  193. inner_result_json = _process(path, lambda: inner_json)
  194. if 'error' in inner_result_json:
  195. status_code = int(inner_result_json['error'][:3])
  196. else:
  197. status_code = 200
  198. if model.current_user_id is not None and status_code == 200:
  199. # if there is a user_id involved, associate it with this websocket
  200. user_id = (model.current_db_name, model.current_user_id)
  201. if user_id in connection.websockets_for_user:
  202. if ws not in connection.websockets_for_user[user_id]:
  203. connection.websockets_for_user[user_id].append(ws)
  204. else:
  205. connection.websockets_for_user[user_id] = [ws]
  206. if ws in connection.users_for_websocket:
  207. if user_id not in connection.users_for_websocket[ws]:
  208. connection.users_for_websocket[ws].append(user_id)
  209. else:
  210. connection.users_for_websocket[ws] = [user_id]
  211. outer_result_json = {
  212. 'body': inner_result_json,
  213. 'http_status_code': status_code,
  214. 'request_token': request_token
  215. }
  216. outer_result_json = json.dumps(outer_result_json)
  217. if ws.closed:
  218. connection.ws_cleanup(ws)
  219. break
  220. ws.send(outer_result_json)
  221. print('websocket message',
  222. *ws.handler.client_address,
  223. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  224. status_code,
  225. len(outer_result_json))
  226. else:
  227. connection.ws_cleanup(ws)
  228. break
  229. except JSONDecodeError:
  230. inner_result_json = handle_error('Unable to decode outer JSON', path, start, outer_json)
  231. status_code = 403
  232. inner_result_json['http_status_code'] = status_code
  233. if request_token is not None:
  234. inner_result_json['request_token'] = request_token
  235. inner_result_json = json.dumps(inner_result_json)
  236. if ws.closed:
  237. connection.ws_cleanup(ws)
  238. break
  239. ws.send(inner_result_json)
  240. print('websocket message',
  241. *ws.handler.client_address,
  242. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  243. status_code,
  244. len(inner_result_json))
  245. except Exception:
  246. inner_result_json = handle_error('Unknown error', path, start, outer_json)
  247. status_code = 500
  248. inner_result_json['http_status_code'] = status_code
  249. if request_token is not None:
  250. inner_result_json['request_token'] = request_token
  251. inner_result_json = json.dumps(inner_result_json)
  252. if ws.closed:
  253. connection.ws_cleanup(ws)
  254. break
  255. ws.send(inner_result_json)
  256. print('websocket message',
  257. *ws.handler.client_address,
  258. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  259. status_code,
  260. len(inner_result_json))
  261. def _serve_static_directory(route, root, download=False):
  262. method_name = ''.join(c for c in root if re.match(r'[A-Za-z]]', c))
  263. assert method_name not in globals()
  264. @bottle.route(route, method=['GET', 'OPTIONS'])
  265. @rename(''.join(c for c in root if re.match(r'[A-Za-z]]', c)))
  266. def serve_static_file(filename):
  267. # start = time.clock()
  268. # logger.log(filename, INFO, message_type='handling_http_request', data=json.dumps({
  269. # 'start': start,
  270. # }))
  271. # try:
  272. if filename == 'api.json':
  273. return {'endpoint': bottle.request.urlparts[0] + '://' + bottle.request.urlparts[1] + '/json/'}
  274. if download:
  275. default_name = 'ytm-' + filename
  276. return bottle.static_file(filename, root=root, download=default_name)
  277. else:
  278. return bottle.static_file(filename, root=root, download=False)
  279. # finally:
  280. # logger.log(filename, INFO, message_type='http_request_finished', data=json.dumps({
  281. # 'status': bottle.response.status_code,
  282. # 'start': start,
  283. # 'end': time.clock(),
  284. # }))
  285. # frontend
  286. print('Preparing frontend directories...')
  287. if len(os.listdir(FRONTEND_RELATIVE_PATH)) == 0:
  288. raise FileNotFoundError(errno.ENOENT, 'Frontend directory is empty:', FRONTEND_RELATIVE_PATH)
  289. for subdir, dirs, files in os.walk(FRONTEND_RELATIVE_PATH):
  290. # subdir now has the form ../frontend/config
  291. _serve_static_directory(
  292. route=subdir.replace('\\', '/').replace(FRONTEND_RELATIVE_PATH, '') + '/<filename>',
  293. root=subdir
  294. )
  295. # app
  296. print('Preparing app for download...')
  297. _serve_static_directory(
  298. route='/app/<filename>',
  299. root='../android/app/release',
  300. download=True,
  301. )
  302. logger.log('Server start', INFO, 'server_start', json.dumps({
  303. 'host': '0.0.0.0',
  304. 'port': connection.PORT,
  305. 'debug': debug,
  306. }))
  307. # commit regularly
  308. log_commit_time = logger.commit()
  309. log_commit_delay = 15
  310. print(f'Committing logfile transaction took {log_commit_time}s, '
  311. f'scheduling to run every {log_commit_delay}s')
  312. threading.Timer(log_commit_delay, logger.commit).start()
  313. print('Running server...')
  314. bottle.run(host='0.0.0.0', port=connection.PORT, debug=debug, server=GeventWebSocketServer)
  315. logger.commit()
  316. model.cleanup()