import logging import threading from typing import Union import gevent.threading class UnLock(object): """ The idea is to use this class within a context manager to release a lock temporarily but then acquire it again. Example: lock = gevent.threading.Lock() with lock: logging.info('Websocket request 10 received') with UnLock(lock): sleep(10.) # this part does not need the resource to be locked logging.info('Websocket request 10 finished') """ def __init__(self, lock: Union[gevent.threading.Lock, threading.Lock], require_taken: bool = True, verbose=False): self.lock = lock self.require_taken = require_taken self.verbose = verbose def __enter__(self): if self.require_taken and not self.lock.locked(): raise RuntimeError('Lock not taken') self.lock.release() if self.verbose: logging.info('Unlocked') def __exit__(self, exc_type, exc_val, exc_tb): self.lock.acquire() if self.verbose: logging.info('Locked again.')