123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- import datetime
- import json
- import os
- import random
- import re
- import sys
- import time
- from json import JSONDecodeError
- from logging import INFO
- from threading import Thread
- from typing import Dict, Any
- import bottle
- # noinspection PyUnresolvedReferences
- from bottle.ext.websocket import GeventWebSocketServer
- # noinspection PyUnresolvedReferences
- from bottle.ext.websocket import websocket
- from gevent import threading
- from gevent.queue import Queue, Empty
- from gevent.threading import Lock
- from geventwebsocket import WebSocketError
- from geventwebsocket.websocket import WebSocket
- import connection
- import model
- import server_controller
- from application import ROOT_URL, COPYRIGHT_INFRINGEMENT_PROBABILITY, DB_NAME, logger
- from connection import HttpError
- from debug import debug
- from lib.print_exc_plus import print_exc_plus
- from lib.threading_timer_decorator import exit_after
- from routes import valid_post_routes, upload_filtered
- from util import round_to_n, rename, profile_wall_time_instead_if_profiling
- FRONTEND_RELATIVE_PATH = '../frontend'
- profile_wall_time_instead_if_profiling()
- request_lock = Lock() # locked until the response to the request is computed
- db_commit_threads = Queue()
- if debug:
- TIMEOUT = 600
- else:
- TIMEOUT = 10
- assert all(getattr(server_controller, route) for route in valid_post_routes)
- def reset_global_variables():
- model.current_connection = None
- model.current_cursor = None
- model.current_db_name = None
- model.current_user_id = None
- del connection.push_message_queue[:]
- bottle.response.status = 500
- @exit_after(TIMEOUT)
- def call_controller_method_with_timeout(method, json_request: Dict[str, Any]):
- return method(json_request)
- def _process(path, json_request):
- start = time.clock()
- path = path.strip().lower()
- bottle.response.content_type = 'application/json; charset=latin-1'
- reset_global_variables()
- original_request = None
- # noinspection PyBroadException
- try:
- json_request = json_request()
- original_request = json_request
- logger.log(path, INFO, message_type='handling_http_request', data=json.dumps({
- 'request': json_request,
- 'start': start,
- }))
- if json_request is None:
- bottle.response.status = 400
- resp = connection.BadRequest('Only json allowed.')
- elif path not in valid_post_routes:
- print('Processing time:', time.clock() - start)
- resp = connection.NotFound('URL not available')
- else:
- model.connect(DB_NAME, create_if_not_exists=True)
- method_to_call = getattr(server_controller, path)
- try:
- resp = call_controller_method_with_timeout(method_to_call, json_request)
- raise connection.Success(resp)
- except HttpError as e:
- bottle.response.status = e.code
- resp = e
- if not isinstance(resp.body, dict):
- raise TypeError('The response body should always be a dict')
- if resp.code // 100 == 2 and path in upload_filtered and random.random() < COPYRIGHT_INFRINGEMENT_PROBABILITY:
- resp = connection.UnavailableForLegalReasons('An upload filter detected a copyright infringement. '
- 'If you think this is an error, please try again.')
- bottle.response.status = resp.code
- if model.current_connection is not None:
- if bottle.response.status_code == 200:
- thread = Thread(target=finish_request, args=[], kwargs={'success': True}, daemon=False)
- else:
- thread = Thread(target=finish_request, args=[], kwargs={'success': False}, daemon=False)
- db_commit_threads.put(thread)
- thread.start()
- print('route=' + path, 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
- 'db=' + str(model.current_db_name))
- logger.log(path, INFO, message_type='http_request_finished', data=json.dumps({
- 'request': json_request,
- 'response': resp.body,
- 'status': resp.code,
- 'start': start,
- 'end': time.clock(),
- }))
- return resp.body
- except JSONDecodeError:
- return handle_error('Unable to decode JSON', path, start, original_request)
- except NotImplementedError:
- return handle_error('This feature has not been fully implemented yet.', path, start, original_request)
- except KeyboardInterrupt:
- if time.clock() - start > TIMEOUT:
- return handle_error('Processing timeout', path, start, original_request)
- else:
- raise
- except Exception:
- return handle_error('Unknown error', path, start, original_request)
- def finish_request(success):
- if success:
- model.current_connection.commit()
- connection.push_messages_in_queue()
- else:
- model.current_connection.rollback()
- if __name__ == '__main__':
- print('sqlite3.version', model.db.version)
- if debug:
- print('Running server in debug mode...')
- print('Preparing backend API...')
- @bottle.route('/json/<path>', method='POST')
- def process(path):
- with request_lock:
- wait_for_db_commit_threads()
- return _process(path, lambda: bottle.request.json)
- def wait_for_db_commit_threads():
- while len(db_commit_threads) > 0:
- try:
- t = db_commit_threads.get()
- except Empty:
- break
- t.join()
- print('Preparing index page...')
- @bottle.route('/', method='GET')
- def index():
- if ROOT_URL != '/':
- bottle.redirect(ROOT_URL)
- def handle_error(message, path, start, request, status=500):
- bottle.response.status = status
- print_exc_plus()
- if model.current_connection is not None:
- model.current_connection.rollback()
- print('route=' + str(path), 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
- 'db=' + str(model.current_db_name))
- logger.exception(path, message_type='http_request', data=json.dumps({
- 'status': status,
- 'start': start,
- 'end': time.clock(),
- 'exception': str(sys.exc_info()),
- 'request': request,
- }))
- return connection.InternalServerError(message).body
- print('Preparing websocket connections...')
- @bottle.get('/websocket', apply=[websocket])
- def websocket(ws: WebSocket):
- print('websocket connection', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- while True:
- start = time.clock()
- path = None
- request_token = None
- outer_json = None
- # noinspection PyBroadException
- try:
- if ws.closed:
- connection.ws_cleanup(ws)
- break
- try:
- msg = ws.read_message()
- except ConnectionResetError:
- msg = None
- except WebSocketError as e:
- if e.args[0] == 'Unexpected EOF while decoding header':
- msg = None
- else:
- raise
- if msg is not None: # received some message
- with request_lock:
- wait_for_db_commit_threads()
- msg = bytes(msg)
- outer_json = None
- outer_json = bottle.json_loads(msg)
- path = outer_json['route']
- inner_json = outer_json['body']
- request_token = outer_json['request_token']
- inner_result_json = _process(path, lambda: inner_json)
- if 'error' in inner_result_json:
- status_code = int(inner_result_json['error'][:3])
- else:
- status_code = 200
- if model.current_user_id is not None and status_code == 200:
- # if there is a user_id involved, associate it with this websocket
- user_id = (model.current_db_name, model.current_user_id)
- if user_id in connection.websockets_for_user:
- if ws not in connection.websockets_for_user[user_id]:
- connection.websockets_for_user[user_id].append(ws)
- else:
- connection.websockets_for_user[user_id] = [ws]
- if ws in connection.users_for_websocket:
- if user_id not in connection.users_for_websocket[ws]:
- connection.users_for_websocket[ws].append(user_id)
- else:
- connection.users_for_websocket[ws] = [user_id]
- outer_result_json = {
- 'body': inner_result_json,
- 'http_status_code': status_code,
- 'request_token': request_token
- }
- outer_result_json = json.dumps(outer_result_json)
- if ws.closed:
- connection.ws_cleanup(ws)
- break
- ws.send(outer_result_json)
- print('websocket message',
- *ws.handler.client_address,
- datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- status_code,
- len(outer_result_json))
- else:
- connection.ws_cleanup(ws)
- break
- except JSONDecodeError:
- inner_result_json = handle_error('Unable to decode outer JSON', path, start, outer_json)
- status_code = 403
- inner_result_json['http_status_code'] = status_code
- if request_token is not None:
- inner_result_json['request_token'] = request_token
- inner_result_json = json.dumps(inner_result_json)
- if ws.closed:
- connection.ws_cleanup(ws)
- break
- ws.send(inner_result_json)
- print('websocket message',
- *ws.handler.client_address,
- datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- status_code,
- len(inner_result_json))
- except Exception:
- inner_result_json = handle_error('Unknown error', path, start, outer_json)
- status_code = 500
- inner_result_json['http_status_code'] = status_code
- if request_token is not None:
- inner_result_json['request_token'] = request_token
- inner_result_json = json.dumps(inner_result_json)
- if ws.closed:
- connection.ws_cleanup(ws)
- break
- ws.send(inner_result_json)
- print('websocket message',
- *ws.handler.client_address,
- datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- status_code,
- len(inner_result_json))
- def _serve_static_directory(route, root, download=False):
- method_name = ''.join(c for c in root if re.match(r'[A-Za-z]]', c))
- assert method_name not in globals()
- @bottle.route(route, method=['GET', 'OPTIONS'])
- @rename(''.join(c for c in root if re.match(r'[A-Za-z]]', c)))
- def serve_static_file(filename):
- # start = time.clock()
- # logger.log(filename, INFO, message_type='handling_http_request', data=json.dumps({
- # 'start': start,
- # }))
- # try:
- if filename == 'api.json':
- return {'endpoint': bottle.request.urlparts[0] + '://' + bottle.request.urlparts[1] + '/json/'}
- if download:
- default_name = 'ytm-' + filename
- return bottle.static_file(filename, root=root, download=default_name)
- else:
- return bottle.static_file(filename, root=root, download=False)
- # finally:
- # logger.log(filename, INFO, message_type='http_request_finished', data=json.dumps({
- # 'status': bottle.response.status_code,
- # 'start': start,
- # 'end': time.clock(),
- # }))
- # frontend
- print('Preparing frontend directories...')
- for subdir, dirs, files in os.walk(FRONTEND_RELATIVE_PATH):
- # subdir now has the form ../frontend/config
- _serve_static_directory(
- route=subdir.replace('\\', '/').replace(FRONTEND_RELATIVE_PATH, '') + '/<filename>',
- root=subdir
- )
- # app
- print('Preparing app for download...')
- _serve_static_directory(
- route='/app/<filename>',
- root='../android/app/release',
- download=True,
- )
- logger.log('Server start', INFO, 'server_start', json.dumps({
- 'host': '0.0.0.0',
- 'port': connection.PORT,
- 'debug': debug,
- }))
- # commit regularly
- log_commit_time = logger.commit()
- log_commit_delay = 15
- print(f'Committing logfile transaction took {log_commit_time}s, '
- f'scheduling to run every {log_commit_delay}s')
- threading.Timer(log_commit_delay, logger.commit).start()
- print('Running server...')
- bottle.run(host='0.0.0.0', port=connection.PORT, debug=debug, server=GeventWebSocketServer)
- logger.commit()
- model.cleanup()
|