1
0
mirror of https://github.com/TandoorRecipes/recipes.git synced 2026-01-11 09:07:12 -05:00

add a static method to manually suggest 'work' to be added to the queue

This commit is contained in:
Mikhail Epifanov
2025-05-14 18:25:58 +02:00
parent 40a7db086f
commit d49e17f088
2 changed files with 26 additions and 7 deletions

View File

@@ -56,6 +56,7 @@ class ConnectorManager(metaclass=Singleton):
def __init__(self):
self._logger = logging.getLogger("recipes.connector")
self._logger.debug("ConnectorManager initializing")
self._queue = queue.Queue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE)
self._worker = threading.Thread(target=self.worker, args=(0, self._queue,), daemon=True)
self._worker.start()
@@ -75,16 +76,34 @@ class ConnectorManager(metaclass=Singleton):
else:
return
try:
self._queue.put_nowait(Work(instance, action_type))
except queue.Full:
self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}")
return
self._add_work(action_type, instance)
def _add_work(self, action_type: ActionType, *instances: REGISTERED_CLASSES):
for instance in instances:
try:
self._queue.put_nowait(Work(instance, action_type))
except queue.Full:
self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}")
def stop(self):
self._queue.join()
self._worker.join()
@classmethod
def is_initialized(cls) -> bool:
return cls in Singleton._instances
@staticmethod
def add_work(action_type: ActionType, *instances: REGISTERED_CLASSES):
"""
Manually inject work that failed to come in through the __call__ (aka Django signal)
Before the work is processed, we check if the connectionManager is initialized, because if it's not, we don't want to accidentally initialize it.
Be careful calling it, because it might result in a instance being processed twice.
"""
if not ConnectorManager.is_initialized():
return
ConnectorManager()._add_work(action_type, *instances)
@staticmethod
def worker(worker_id: int, worker_queue: queue.Queue):
logger = logging.getLogger("recipes.connector.worker")