From d49e17f088f2148478acd38534bf7c94050d57a1 Mon Sep 17 00:00:00 2001 From: Mikhail Epifanov Date: Wed, 14 May 2025 18:25:58 +0200 Subject: [PATCH] add a static method to manually suggest 'work' to be added to the queue --- cookbook/connectors/connector_manager.py | 29 ++++++++++++++++++++---- cookbook/views/api.py | 4 ++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/cookbook/connectors/connector_manager.py b/cookbook/connectors/connector_manager.py index 4c99281a4..6b6b8d681 100644 --- a/cookbook/connectors/connector_manager.py +++ b/cookbook/connectors/connector_manager.py @@ -56,6 +56,7 @@ class ConnectorManager(metaclass=Singleton): def __init__(self): self._logger = logging.getLogger("recipes.connector") + self._logger.debug("ConnectorManager initializing") self._queue = queue.Queue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) self._worker = threading.Thread(target=self.worker, args=(0, self._queue,), daemon=True) self._worker.start() @@ -75,16 +76,34 @@ class ConnectorManager(metaclass=Singleton): else: return - try: - self._queue.put_nowait(Work(instance, action_type)) - except queue.Full: - self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}") - return + self._add_work(action_type, instance) + + def _add_work(self, action_type: ActionType, *instances: REGISTERED_CLASSES): + for instance in instances: + try: + self._queue.put_nowait(Work(instance, action_type)) + except queue.Full: + self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}") def stop(self): self._queue.join() self._worker.join() + @classmethod + def is_initialized(cls) -> bool: + return cls in Singleton._instances + + @staticmethod + def add_work(action_type: ActionType, *instances: REGISTERED_CLASSES): + """ + Manually inject work that failed to come in through the __call__ (aka Django signal) + Before the work is processed, we check if the connectionManager is initialized, because if it's not, we don't want to accidentally initialize it. + Be careful calling it, because it might result in a instance being processed twice. + """ + if not ConnectorManager.is_initialized(): + return + ConnectorManager()._add_work(action_type, *instances) + @staticmethod def worker(worker_id: int, worker_queue: queue.Queue): logger = logging.getLogger("recipes.connector.worker") diff --git a/cookbook/views/api.py b/cookbook/views/api.py index 77745ac46..4e725107a 100644 --- a/cookbook/views/api.py +++ b/cookbook/views/api.py @@ -61,6 +61,7 @@ from rest_framework.viewsets import ViewSetMixin from rest_framework.serializers import CharField, IntegerField, UUIDField from treebeard.exceptions import InvalidMoveToDescendant, InvalidPosition, PathOverflow +from cookbook.connectors.connector_manager import ConnectorManager, ActionType from cookbook.forms import ImportForm from cookbook.helper import recipe_url_import as helper from cookbook.helper.HelperFunctions import str2bool, validate_import_url @@ -1254,8 +1255,6 @@ class RecipeViewSet(LoggingMixin, viewsets.ModelViewSet): SLR = RecipeShoppingEditor(request.user, request.space, id=list_recipe, recipe=obj, mealplan=mealplan, servings=servings) - content = {'msg': _(f'{obj.name} was added to the shopping list.')} - http_status = status.HTTP_204_NO_CONTENT if servings and servings <= 0: result = SLR.delete() elif list_recipe: @@ -1392,6 +1391,7 @@ class ShoppingListRecipeViewSet(LoggingMixin, viewsets.ModelViewSet): ) ShoppingListEntry.objects.bulk_create(entries) + ConnectorManager.add_work(ActionType.CREATED, *entries) return Response(serializer.validated_data) else: return Response(serializer.errors, 400)