123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- import json
- import random
- import re
- import sys
- import time
- from datetime import datetime
- from math import inf
- from time import perf_counter
- from typing import Dict, Callable
- from uuid import uuid4
- import requests
- import connection
- import model
- import test.do_some_requests.current_websocket
- from debug import debug
- from game import random_ownable_name, DB_NAME, CURRENCY_NAME
- from test import failed_requests
- from util import round_to_n
- MRO_AMOUNT = 1e9
- DEFAULT_PW = 'pw'
- PORT = connection.PORT
- HOST = 'http://127.0.0.1' + ':' + str(PORT)
- # HOST = 'http://koljastrohm-games.com' + ':' + str(PORT)
- JSON_HEADERS = {'Content-type': 'application/json'}
- EXIT_ON_FAILED_REQUEST = True
- response_collection: Dict[str, Dict] = {}
- default_request_method: Callable[[str, Dict], Dict]
- def receive_answer(token):
- """Waits until the server sends an answer that contains the desired request_token.
- All intermediate requests are also collected for later use, or, if they contain no token, they are just printed out.
- """
- if token in response_collection:
- json_content = response_collection[token]
- del response_collection[token]
- return json_content
- json_content = {}
- while 'request_token' not in json_content or json_content['request_token'] != token:
- if 'request_token' in json_content:
- response_collection[json_content['request_token']] = json_content
- received = test.do_some_requests.current_websocket.current_websocket.recv_data_frame()[1].data
- content = received.decode('utf-8')
- formatted_content = re.sub(r'{([^}]*?):(.*?)}', r'\n{\g<1>:\g<2>}', content)
- print('Received through websocket: ' + formatted_content)
- json_content = json.loads(content)
- return json_content
- def websocket_request(route: str, data: Dict) -> Dict:
- original_data = data
- if not test.do_some_requests.current_websocket.current_websocket.connected:
- ws_host = HOST.replace('http://', 'ws://')
- test.do_some_requests.current_websocket.current_websocket.connect(ws_host + '/websocket')
- token = str(uuid4())
- data = json.dumps({'route': route, 'body': data, 'request_token': token})
- print('Sending to websocket:', str(data).replace('{', '\n{')[1:])
- test.do_some_requests.current_websocket.current_websocket.send(data, opcode=2)
- json_content = receive_answer(token)
- print()
- status_code = json_content['http_status_code']
- if status_code == 200:
- pass
- elif status_code == 451: # Copyright problems, likely a bug in the upload filter
- # Try again
- return websocket_request(route, original_data)
- else:
- if EXIT_ON_FAILED_REQUEST:
- if not test.do_some_requests.current_websocket.current_websocket.connected:
- test.do_some_requests.current_websocket.current_websocket.close()
- sys.exit(status_code)
- failed_requests.append((route, status_code))
- return json_content['body']
- def http_request(route: str, data: Dict) -> Dict:
- original_data = data
- data = json.dumps(data)
- print('Sending to /' + route + ':', str(data).replace('{', '\n{')[1:])
- r = requests.post(HOST + '/json/' + route, data=data,
- headers=JSON_HEADERS)
- content = r.content.decode()
- print('Request returned: ' + content.replace('{', '\n{'))
- print()
- if r.status_code == 200:
- pass
- elif r.status_code == 451:
- return http_request(route, original_data)
- else:
- if EXIT_ON_FAILED_REQUEST:
- sys.exit(r.status_code)
- failed_requests.append((route, r.status_code))
- return json.loads(content)
- default_request_method: Callable[[str, Dict], Dict] = http_request
- # default_request_method = websocket_request
- def random_time():
- start = random.randrange(140)
- start = 1561960800 + start * 21600 # somewhere in july or at the beginning of august 2019
- return {
- 'dt_start': start,
- 'dt_end': start + random.choice([1800, 3600, 7200, 86400]),
- }
- def run_tests():
- if debug:
- print('You are currently in debug mode.')
- print('Host:', str(HOST))
- usernames = [f'user{datetime.now().timestamp()}',
- f'user{datetime.now().timestamp()}+1']
- banks = usernames[:1]
- session_ids = {}
- message = {}
- route = 'news'
- default_request_method(route, message)
- message = {}
- route = 'leaderboard'
- default_request_method(route, message)
- message = {}
- route = 'tradables'
- default_request_method(route, message)
- for username in usernames:
- message = {'username': username, 'password': DEFAULT_PW}
- route = 'register'
- default_request_method(route, message)
- message = {'username': username, 'password': DEFAULT_PW}
- route = 'login'
- session_ids[username] = default_request_method(route, message)['session_id']
- message = {'session_id': session_ids[username]}
- route = 'logout'
- default_request_method(route, message)
- message = {'username': username, 'password': DEFAULT_PW}
- route = 'login'
- session_ids[username] = default_request_method(route, message)['session_id']
- message = {'session_id': session_ids[username]}
- route = 'depot'
- default_request_method(route, message)
- message = {'session_id': session_ids[username]}
- route = 'orders'
- default_request_method(route, message)
- message = {'session_id': session_ids[username], "ownable": "\u20adollar"}
- route = 'orders_on'
- default_request_method(route, message)
- for password in ['pw2', DEFAULT_PW]:
- message = {'session_id': session_ids[username], 'password': password}
- route = 'change_password'
- default_request_method(route, message)
- message = {'username': username, 'password': password}
- route = 'login'
- session_ids[username] = default_request_method(route, message)['session_id']
- for limit in [0, 5, 10, 20, 50]:
- message = {'session_id': session_ids[username], 'limit': limit}
- route = 'trades'
- data = default_request_method(route, message)['data']
- assert len(data) <= limit
- for username in banks:
- message = {'session_id': session_ids[username], 'amount': 5.5e6}
- route = 'take_out_personal_loan'
- default_request_method(route, message)
- message = {'session_id': session_ids[username]}
- route = 'buy_banking_license'
- default_request_method(route, message)
- message = {'session_id': session_ids[username],
- 'coupon': 0.05,
- 'name': random_ownable_name(),
- 'run_time': 43200}
- route = 'issue_bond'
- default_request_method(route, message)
- message = {'issuer': username}
- route = 'credits'
- default_request_method(route, message)
- message = {'issuer': username, 'only_next_mro_qualified': True}
- route = 'credits'
- default_request_method(route, message)
- message = {'issuer': username, 'only_next_mro_qualified': False}
- route = 'credits'
- default_request_method(route, message)
- my_mro_name = random_ownable_name()
- message = {'session_id': session_ids[username],
- 'coupon': 'next_mro',
- 'name': my_mro_name,
- 'run_time': 'next_mro'}
- route = 'issue_bond'
- default_request_method(route, message)
- message = {'session_id': session_ids[username],
- 'buy': False,
- 'ownable': my_mro_name,
- 'amount': MRO_AMOUNT,
- 'limit': 1,
- 'stop_loss': False,
- 'time_until_expiration': 43200}
- route = 'order'
- default_request_method(route, message)
- message = {'session_id': session_ids[username]}
- route = 'orders'
- orders_before_tt = default_request_method(route, message)['data']
- assert len(orders_before_tt) == 1
- message = {'issuer': username}
- route = 'credits'
- default_request_method(route, message)
- message = {'issuer': username, 'only_next_mro_qualified': True}
- route = 'credits'
- default_request_method(route, message)
- message = {'issuer': username, 'only_next_mro_qualified': False}
- route = 'credits'
- default_request_method(route, message)
- message = {'session_id': session_ids[username]}
- route = 'depot'
- depot_before_tt = default_request_method(route, message)
- message = {}
- route = 'tender_calendar'
- tender_calendar = default_request_method(route, message)['data']
- next_mro_dt = inf
- for row in tender_calendar:
- if row[0] > time.time():
- next_mro_dt = min(next_mro_dt, row[0])
- assert next_mro_dt < inf
- model.connect(DB_NAME)
- assert next_mro_dt - time.time() - 10 > 0, 'Random test fail, this can happen'
- model.time_travel(next_mro_dt - time.time() - 10) # 10 seconds before MRO
- model.current_connection.commit()
- model.cleanup()
- message = {'session_id': session_ids[username]}
- route = 'depot'
- depot_before_mro = default_request_method(route, message)
- message = {'session_id': session_ids[username]}
- route = 'orders'
- orders_before_mro = default_request_method(route, message)['data']
- assert len(orders_before_mro) == 1
- # some money was spent as interest
- assert depot_before_tt['own_wealth'] > depot_before_mro['own_wealth']
- assert depot_before_tt['minimum_reserve'] == depot_before_mro['minimum_reserve'] == 0
- assert depot_before_tt['banking_license'] and depot_before_mro['banking_license']
- for row1 in depot_before_tt['data']:
- for row2 in depot_before_mro['data']:
- if row1[0] == row2[0]:
- if row1[0] == CURRENCY_NAME:
- assert row1[1] > row2[1]
- else:
- assert row1[1] == row2[1]
- model.connect(DB_NAME)
- model.time_travel(20) # 10 seconds after MRO
- model.current_connection.commit()
- model.cleanup()
- message = {'session_id': session_ids[username]}
- route = 'orders'
- orders_after_mro = default_request_method(route, message)['data']
- assert len(orders_after_mro) == 0
- message = {'session_id': session_ids[username]}
- route = 'depot'
- depot_after_mro = default_request_method(route, message)
- # some money was spent as interest and some MROs were sold
- assert depot_before_mro['own_wealth'] >= depot_after_mro['own_wealth'] # can be equal since interest rates are only subtracted once per time interval
- assert depot_after_mro['minimum_reserve'] == max(0., MRO_AMOUNT * 0.01 - 100000)
- assert depot_after_mro['banking_license']
- for row1 in depot_before_mro['data']:
- for row2 in depot_after_mro['data']:
- if row1[0] == row2[0]:
- if row1[0] == CURRENCY_NAME:
- assert row1[1] < row2[1] < row1[1] + MRO_AMOUNT
- else:
- assert row1[1] == row2[1]
- message = {}
- route = 'credits'
- default_request_method(route, message)
- message = {'only_next_mro_qualified': True}
- route = 'credits'
- default_request_method(route, message)
- message = {'only_next_mro_qualified': False}
- route = 'credits'
- default_request_method(route, message)
- for session_id in session_ids.values():
- message = {'session_id': session_id}
- route = 'logout'
- default_request_method(route, message)
- def main():
- global default_request_method
- for m in [
- test.do_some_requests.websocket_request,
- # test.do_some_requests.http_request
- ]:
- # print('Removing existing database (if exists)...', end='')
- # try:
- # os.remove(DB_NAME + '.db')
- # except PermissionError:
- # print('Could not recreate database')
- # sys.exit(-1)
- # except FileNotFoundError:
- # pass
- # print('done')
- default_request_method = m
- start = perf_counter()
- run_tests()
- print()
- print('Failed requests:', failed_requests)
- print('Total time:' + str(round_to_n(perf_counter() - start, 4)) + 's,')
- if test.do_some_requests.current_websocket.current_websocket.connected:
- test.do_some_requests.current_websocket.current_websocket.close()
- sys.exit(len(failed_requests))
- if __name__ == '__main__':
- main()
|