import datetime import json import os import random import re import sys import time from json import JSONDecodeError from logging import INFO from threading import Thread from typing import Dict, Any import bottle # noinspection PyUnresolvedReferences from bottle.ext.websocket import GeventWebSocketServer # noinspection PyUnresolvedReferences from bottle.ext.websocket import websocket from gevent import threading from gevent.queue import Queue, Empty from gevent.threading import Lock from geventwebsocket import WebSocketError from geventwebsocket.websocket import WebSocket import connection import model import server_controller from application import ROOT_URL, COPYRIGHT_INFRINGEMENT_PROBABILITY, DB_NAME, logger from connection import HttpError from debug import debug from lib.print_exc_plus import print_exc_plus from lib.threading_timer_decorator import exit_after from routes import valid_post_routes, upload_filtered from util import round_to_n, rename, profile_wall_time_instead_if_profiling FRONTEND_RELATIVE_PATH = '../frontend' profile_wall_time_instead_if_profiling() request_lock = Lock() # locked until the response to the request is computed db_commit_threads = Queue() if debug: TIMEOUT = 600 else: TIMEOUT = 10 assert all(getattr(server_controller, route) for route in valid_post_routes) def reset_global_variables(): model.current_connection = None model.current_cursor = None model.current_db_name = None model.current_user_id = None del connection.push_message_queue[:] bottle.response.status = 500 @exit_after(TIMEOUT) def call_controller_method_with_timeout(method, json_request: Dict[str, Any]): return method(json_request) def _process(path, json_request): start = time.clock() path = path.strip().lower() bottle.response.content_type = 'application/json; charset=latin-1' reset_global_variables() original_request = None # noinspection PyBroadException try: json_request = json_request() original_request = json_request logger.log(path, INFO, message_type='handling_http_request', data=json.dumps({ 'request': json_request, 'start': start, })) if json_request is None: bottle.response.status = 400 resp = connection.BadRequest('Only json allowed.') elif path not in valid_post_routes: print('Processing time:', time.clock() - start) resp = connection.NotFound('URL not available') else: model.connect(DB_NAME, create_if_not_exists=True) method_to_call = getattr(server_controller, path) try: resp = call_controller_method_with_timeout(method_to_call, json_request) raise connection.Success(resp) except HttpError as e: bottle.response.status = e.code resp = e if not isinstance(resp.body, dict): raise TypeError('The response body should always be a dict') if resp.code // 100 == 2 and path in upload_filtered and random.random() < COPYRIGHT_INFRINGEMENT_PROBABILITY: resp = connection.UnavailableForLegalReasons('An upload filter detected a copyright infringement. ' 'If you think this is an error, please try again.') bottle.response.status = resp.code if model.current_connection is not None: if bottle.response.status_code == 200: thread = Thread(target=finish_request, args=[], kwargs={'success': True}, daemon=False) else: thread = Thread(target=finish_request, args=[], kwargs={'success': False}, daemon=False) db_commit_threads.put(thread) thread.start() print('route=' + path, 't=' + str(round_to_n(time.clock() - start, 4)) + 's,', 'db=' + str(model.current_db_name)) logger.log(path, INFO, message_type='http_request_finished', data=json.dumps({ 'request': json_request, 'response': resp.body, 'status': resp.code, 'start': start, 'end': time.clock(), })) return resp.body except JSONDecodeError: return handle_error('Unable to decode JSON', path, start, original_request) except NotImplementedError: return handle_error('This feature has not been fully implemented yet.', path, start, original_request) except KeyboardInterrupt: if time.clock() - start > TIMEOUT: return handle_error('Processing timeout', path, start, original_request) else: raise except Exception: return handle_error('Unknown error', path, start, original_request) def finish_request(success): if success: model.current_connection.commit() connection.push_messages_in_queue() else: model.current_connection.rollback() if __name__ == '__main__': print('sqlite3.version', model.db.version) if debug: print('Running server in debug mode...') print('Preparing backend API...') @bottle.route('/json/', method='POST') def process(path): with request_lock: wait_for_db_commit_threads() return _process(path, lambda: bottle.request.json) def wait_for_db_commit_threads(): while len(db_commit_threads) > 0: try: t = db_commit_threads.get() except Empty: break t.join() print('Preparing index page...') @bottle.route('/', method='GET') def index(): if ROOT_URL != '/': bottle.redirect(ROOT_URL) def handle_error(message, path, start, request, status=500): bottle.response.status = status print_exc_plus() if model.current_connection is not None: model.current_connection.rollback() print('route=' + str(path), 't=' + str(round_to_n(time.clock() - start, 4)) + 's,', 'db=' + str(model.current_db_name)) logger.exception(path, message_type='http_request', data=json.dumps({ 'status': status, 'start': start, 'end': time.clock(), 'exception': str(sys.exc_info()), 'request': request, })) return connection.InternalServerError(message).body print('Preparing websocket connections...') @bottle.get('/websocket', apply=[websocket]) def websocket(ws: WebSocket): print('websocket connection', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) while True: start = time.clock() path = None request_token = None outer_json = None # noinspection PyBroadException try: if ws.closed: connection.ws_cleanup(ws) break try: msg = ws.read_message() except ConnectionResetError: msg = None except WebSocketError as e: if e.args[0] == 'Unexpected EOF while decoding header': msg = None else: raise if msg is not None: # received some message with request_lock: wait_for_db_commit_threads() msg = bytes(msg) outer_json = None outer_json = bottle.json_loads(msg) path = outer_json['route'] inner_json = outer_json['body'] request_token = outer_json['request_token'] inner_result_json = _process(path, lambda: inner_json) if 'error' in inner_result_json: status_code = int(inner_result_json['error'][:3]) else: status_code = 200 if model.current_user_id is not None and status_code == 200: # if there is a user_id involved, associate it with this websocket user_id = (model.current_db_name, model.current_user_id) if user_id in connection.websockets_for_user: if ws not in connection.websockets_for_user[user_id]: connection.websockets_for_user[user_id].append(ws) else: connection.websockets_for_user[user_id] = [ws] if ws in connection.users_for_websocket: if user_id not in connection.users_for_websocket[ws]: connection.users_for_websocket[ws].append(user_id) else: connection.users_for_websocket[ws] = [user_id] outer_result_json = { 'body': inner_result_json, 'http_status_code': status_code, 'request_token': request_token } outer_result_json = json.dumps(outer_result_json) if ws.closed: connection.ws_cleanup(ws) break ws.send(outer_result_json) print('websocket message', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), status_code, len(outer_result_json)) else: connection.ws_cleanup(ws) break except JSONDecodeError: inner_result_json = handle_error('Unable to decode outer JSON', path, start, outer_json) status_code = 403 inner_result_json['http_status_code'] = status_code if request_token is not None: inner_result_json['request_token'] = request_token inner_result_json = json.dumps(inner_result_json) if ws.closed: connection.ws_cleanup(ws) break ws.send(inner_result_json) print('websocket message', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), status_code, len(inner_result_json)) except Exception: inner_result_json = handle_error('Unknown error', path, start, outer_json) status_code = 500 inner_result_json['http_status_code'] = status_code if request_token is not None: inner_result_json['request_token'] = request_token inner_result_json = json.dumps(inner_result_json) if ws.closed: connection.ws_cleanup(ws) break ws.send(inner_result_json) print('websocket message', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), status_code, len(inner_result_json)) def _serve_static_directory(route, root, download=False): method_name = ''.join(c for c in root if re.match(r'[A-Za-z]]', c)) assert method_name not in globals() @bottle.route(route, method=['GET', 'OPTIONS']) @rename(''.join(c for c in root if re.match(r'[A-Za-z]]', c))) def serve_static_file(filename): # start = time.clock() # logger.log(filename, INFO, message_type='handling_http_request', data=json.dumps({ # 'start': start, # })) # try: if filename == 'api.json': return {'endpoint': bottle.request.urlparts[0] + '://' + bottle.request.urlparts[1] + '/json/'} if download: default_name = 'ytm-' + filename return bottle.static_file(filename, root=root, download=default_name) else: return bottle.static_file(filename, root=root, download=False) # finally: # logger.log(filename, INFO, message_type='http_request_finished', data=json.dumps({ # 'status': bottle.response.status_code, # 'start': start, # 'end': time.clock(), # })) # frontend print('Preparing frontend directories...') for subdir, dirs, files in os.walk(FRONTEND_RELATIVE_PATH): # subdir now has the form ../frontend/config _serve_static_directory( route=subdir.replace('\\', '/').replace(FRONTEND_RELATIVE_PATH, '') + '/', root=subdir ) # app print('Preparing app for download...') _serve_static_directory( route='/app/', root='../android/app/release', download=True, ) logger.log('Server start', INFO, 'server_start', json.dumps({ 'host': '0.0.0.0', 'port': connection.PORT, 'debug': debug, })) # commit regularly log_commit_time = logger.commit() log_commit_delay = 15 print(f'Committing logfile transaction took {log_commit_time}s, ' f'scheduling to run every {log_commit_delay}s') threading.Timer(log_commit_delay, logger.commit).start() print('Running server...') bottle.run(host='0.0.0.0', port=connection.PORT, debug=debug, server=GeventWebSocketServer) logger.commit() model.cleanup()