1234567891011121314151617181920212223242526272829303132333435 |
- import logging
- import threading
- from typing import Union
- import gevent.threading
- class UnLock(object):
- """
- The idea is to use this class within a context manager to release a lock temporarily but then acquire it again.
- Example:
- lock = gevent.threading.Lock()
- with lock:
- logging.info('Websocket request 10 received')
- with UnLock(lock):
- sleep(10.) # this part does not need the resource to be locked
- logging.info('Websocket request 10 finished')
- """
- def __init__(self, lock: Union[gevent.threading.Lock, threading.Lock], require_taken: bool = True, verbose=False):
- self.lock = lock
- self.require_taken = require_taken
- self.verbose = verbose
- def __enter__(self):
- if self.require_taken and not self.lock.locked():
- raise RuntimeError('Lock not taken')
- self.lock.release()
- if self.verbose:
- logging.info('Unlocked')
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.lock.acquire()
- if self.verbose:
- logging.info('Locked again.')
|