unlock.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435
  1. import logging
  2. import threading
  3. from typing import Union
  4. import gevent.threading
  5. class UnLock(object):
  6. """
  7. The idea is to use this class within a context manager to release a lock temporarily but then acquire it again.
  8. Example:
  9. lock = gevent.threading.Lock()
  10. with lock:
  11. logging.info('Websocket request 10 received')
  12. with UnLock(lock):
  13. sleep(10.) # this part does not need the resource to be locked
  14. logging.info('Websocket request 10 finished')
  15. """
  16. def __init__(self, lock: Union[gevent.threading.Lock, threading.Lock], require_taken: bool = True, verbose=False):
  17. self.lock = lock
  18. self.require_taken = require_taken
  19. self.verbose = verbose
  20. def __enter__(self):
  21. if self.require_taken and not self.lock.locked():
  22. raise RuntimeError('Lock not taken')
  23. self.lock.release()
  24. if self.verbose:
  25. logging.info('Unlocked')
  26. def __exit__(self, exc_type, exc_val, exc_tb):
  27. self.lock.acquire()
  28. if self.verbose:
  29. logging.info('Locked again.')