import json import random import re import sys import time from datetime import datetime from math import inf from time import perf_counter from typing import Dict, Callable from uuid import uuid4 import requests import connection import model import test.do_some_requests.current_websocket from debug import debug from game import random_ownable_name, DB_NAME, CURRENCY_NAME from test import failed_requests from util import round_to_n MRO_AMOUNT = 1e9 DEFAULT_PW = 'pw' PORT = connection.PORT HOST = 'http://127.0.0.1' + ':' + str(PORT) # HOST = 'http://koljastrohm-games.com' + ':' + str(PORT) JSON_HEADERS = {'Content-type': 'application/json'} EXIT_ON_FAILED_REQUEST = True response_collection: Dict[str, Dict] = {} default_request_method: Callable[[str, Dict], Dict] def receive_answer(token): """Waits until the server sends an answer that contains the desired request_token. All intermediate requests are also collected for later use, or, if they contain no token, they are just printed out. """ if token in response_collection: json_content = response_collection[token] del response_collection[token] return json_content json_content = {} while 'request_token' not in json_content or json_content['request_token'] != token: if 'request_token' in json_content: response_collection[json_content['request_token']] = json_content received = test.do_some_requests.current_websocket.current_websocket.recv_data_frame()[1].data content = received.decode('utf-8') formatted_content = re.sub(r'{([^}]*?):(.*?)}', r'\n{\g<1>:\g<2>}', content) print('Received through websocket: ' + formatted_content) json_content = json.loads(content) return json_content def websocket_request(route: str, data: Dict) -> Dict: original_data = data if not test.do_some_requests.current_websocket.current_websocket.connected: ws_host = HOST.replace('http://', 'ws://') test.do_some_requests.current_websocket.current_websocket.connect(ws_host + '/websocket') token = str(uuid4()) data = json.dumps({'route': route, 'body': data, 'request_token': token}) print('Sending to websocket:', str(data).replace('{', '\n{')[1:]) test.do_some_requests.current_websocket.current_websocket.send(data, opcode=2) json_content = receive_answer(token) print() status_code = json_content['http_status_code'] if status_code == 200: pass elif status_code == 451: # Copyright problems, likely a bug in the upload filter # Try again return websocket_request(route, original_data) else: if EXIT_ON_FAILED_REQUEST: if not test.do_some_requests.current_websocket.current_websocket.connected: test.do_some_requests.current_websocket.current_websocket.close() sys.exit(status_code) failed_requests.append((route, status_code)) return json_content['body'] def http_request(route: str, data: Dict) -> Dict: original_data = data data = json.dumps(data) print('Sending to /' + route + ':', str(data).replace('{', '\n{')[1:]) r = requests.post(HOST + '/json/' + route, data=data, headers=JSON_HEADERS) content = r.content.decode() print('Request returned: ' + content.replace('{', '\n{')) print() if r.status_code == 200: pass elif r.status_code == 451: return http_request(route, original_data) else: if EXIT_ON_FAILED_REQUEST: sys.exit(r.status_code) failed_requests.append((route, r.status_code)) return json.loads(content) default_request_method: Callable[[str, Dict], Dict] = http_request # default_request_method = websocket_request def random_time(): start = random.randrange(140) start = 1561960800 + start * 21600 # somewhere in july or at the beginning of august 2019 return { 'dt_start': start, 'dt_end': start + random.choice([1800, 3600, 7200, 86400]), } def run_tests(): if debug: print('You are currently in debug mode.') print('Host:', str(HOST)) usernames = [f'user{datetime.now().timestamp()}', f'user{datetime.now().timestamp()}+1'] banks = usernames[:1] session_ids = {} message = {} route = 'news' default_request_method(route, message) message = {} route = 'leaderboard' default_request_method(route, message) message = {} route = 'tradables' default_request_method(route, message) for username in usernames: message = {'username': username, 'password': DEFAULT_PW} route = 'register' default_request_method(route, message) message = {'username': username, 'password': DEFAULT_PW} route = 'login' session_ids[username] = default_request_method(route, message)['session_id'] message = {'session_id': session_ids[username]} route = 'logout' default_request_method(route, message) message = {'username': username, 'password': DEFAULT_PW} route = 'login' session_ids[username] = default_request_method(route, message)['session_id'] message = {'session_id': session_ids[username]} route = 'depot' default_request_method(route, message) message = {'session_id': session_ids[username]} route = 'orders' default_request_method(route, message) message = {'session_id': session_ids[username], "ownable": "\u20adollar"} route = 'orders_on' default_request_method(route, message) for password in ['pw2', DEFAULT_PW]: message = {'session_id': session_ids[username], 'password': password} route = 'change_password' default_request_method(route, message) message = {'username': username, 'password': password} route = 'login' session_ids[username] = default_request_method(route, message)['session_id'] for limit in [0, 5, 10, 20, 50]: message = {'session_id': session_ids[username], 'limit': limit} route = 'trades' data = default_request_method(route, message)['data'] assert len(data) <= limit for username in banks: message = {'session_id': session_ids[username], 'amount': 5.5e6} route = 'take_out_personal_loan' default_request_method(route, message) message = {'session_id': session_ids[username]} route = 'buy_banking_license' default_request_method(route, message) message = {'session_id': session_ids[username], 'coupon': 0.05, 'name': random_ownable_name(), 'run_time': 43200} route = 'issue_bond' default_request_method(route, message) message = {'issuer': username} route = 'credits' default_request_method(route, message) message = {'issuer': username, 'only_next_mro_qualified': True} route = 'credits' default_request_method(route, message) message = {'issuer': username, 'only_next_mro_qualified': False} route = 'credits' default_request_method(route, message) my_mro_name = random_ownable_name() message = {'session_id': session_ids[username], 'coupon': 'next_mro', 'name': my_mro_name, 'run_time': 'next_mro'} route = 'issue_bond' default_request_method(route, message) message = {'session_id': session_ids[username], 'buy': False, 'ownable': my_mro_name, 'amount': MRO_AMOUNT, 'limit': 1, 'stop_loss': False, 'time_until_expiration': 43200} route = 'order' default_request_method(route, message) message = {'session_id': session_ids[username]} route = 'orders' orders_before_tt = default_request_method(route, message)['data'] assert len(orders_before_tt) == 1 message = {'issuer': username} route = 'credits' default_request_method(route, message) message = {'issuer': username, 'only_next_mro_qualified': True} route = 'credits' default_request_method(route, message) message = {'issuer': username, 'only_next_mro_qualified': False} route = 'credits' default_request_method(route, message) message = {'session_id': session_ids[username]} route = 'depot' depot_before_tt = default_request_method(route, message) message = {} route = 'tender_calendar' tender_calendar = default_request_method(route, message)['data'] next_mro_dt = inf for row in tender_calendar: if row[0] > time.time(): next_mro_dt = min(next_mro_dt, row[0]) assert next_mro_dt < inf model.connect(DB_NAME) assert next_mro_dt - time.time() - 10 > 0, 'Random test fail, this can happen' model.time_travel(next_mro_dt - time.time() - 10) # 10 seconds before MRO model.current_connection.commit() model.cleanup() message = {'session_id': session_ids[username]} route = 'depot' depot_before_mro = default_request_method(route, message) message = {'session_id': session_ids[username]} route = 'orders' orders_before_mro = default_request_method(route, message)['data'] assert len(orders_before_mro) == 1 # some money was spent as interest assert depot_before_tt['own_wealth'] > depot_before_mro['own_wealth'] assert depot_before_tt['minimum_reserve'] == depot_before_mro['minimum_reserve'] == 0 assert depot_before_tt['banking_license'] and depot_before_mro['banking_license'] for row1 in depot_before_tt['data']: for row2 in depot_before_mro['data']: if row1[0] == row2[0]: if row1[0] == CURRENCY_NAME: assert row1[1] > row2[1] else: assert row1[1] == row2[1] model.connect(DB_NAME) model.time_travel(20) # 10 seconds after MRO model.current_connection.commit() model.cleanup() message = {'session_id': session_ids[username]} route = 'orders' orders_after_mro = default_request_method(route, message)['data'] assert len(orders_after_mro) == 0 message = {'session_id': session_ids[username]} route = 'depot' depot_after_mro = default_request_method(route, message) # some money was spent as interest and some MROs were sold assert depot_before_mro['own_wealth'] >= depot_after_mro['own_wealth'] # can be equal since interest rates are only subtracted once per time interval assert depot_after_mro['minimum_reserve'] == max(0., MRO_AMOUNT * 0.01 - 100000) assert depot_after_mro['banking_license'] for row1 in depot_before_mro['data']: for row2 in depot_after_mro['data']: if row1[0] == row2[0]: if row1[0] == CURRENCY_NAME: assert row1[1] < row2[1] < row1[1] + MRO_AMOUNT else: assert row1[1] == row2[1] message = {} route = 'credits' default_request_method(route, message) message = {'only_next_mro_qualified': True} route = 'credits' default_request_method(route, message) message = {'only_next_mro_qualified': False} route = 'credits' default_request_method(route, message) for session_id in session_ids.values(): message = {'session_id': session_id} route = 'logout' default_request_method(route, message) def main(): global default_request_method for m in [ test.do_some_requests.websocket_request, # test.do_some_requests.http_request ]: # print('Removing existing database (if exists)...', end='') # try: # os.remove(DB_NAME + '.db') # except PermissionError: # print('Could not recreate database') # sys.exit(-1) # except FileNotFoundError: # pass # print('done') default_request_method = m start = perf_counter() run_tests() print() print('Failed requests:', failed_requests) print('Total time:' + str(round_to_n(perf_counter() - start, 4)) + 's,') if test.do_some_requests.current_websocket.current_websocket.connected: test.do_some_requests.current_websocket.current_websocket.close() sys.exit(len(failed_requests)) if __name__ == '__main__': main()