From d49e17f088f2148478acd38534bf7c94050d57a1 Mon Sep 17 00:00:00 2001 From: Mikhail Epifanov Date: Wed, 14 May 2025 18:25:58 +0200 Subject: [PATCH 1/4] add a static method to manually suggest 'work' to be added to the queue --- cookbook/connectors/connector_manager.py | 29 ++++++++++++++++++++---- cookbook/views/api.py | 4 ++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/cookbook/connectors/connector_manager.py b/cookbook/connectors/connector_manager.py index 4c99281a4..6b6b8d681 100644 --- a/cookbook/connectors/connector_manager.py +++ b/cookbook/connectors/connector_manager.py @@ -56,6 +56,7 @@ class ConnectorManager(metaclass=Singleton): def __init__(self): self._logger = logging.getLogger("recipes.connector") + self._logger.debug("ConnectorManager initializing") self._queue = queue.Queue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) self._worker = threading.Thread(target=self.worker, args=(0, self._queue,), daemon=True) self._worker.start() @@ -75,16 +76,34 @@ class ConnectorManager(metaclass=Singleton): else: return - try: - self._queue.put_nowait(Work(instance, action_type)) - except queue.Full: - self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}") - return + self._add_work(action_type, instance) + + def _add_work(self, action_type: ActionType, *instances: REGISTERED_CLASSES): + for instance in instances: + try: + self._queue.put_nowait(Work(instance, action_type)) + except queue.Full: + self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}") def stop(self): self._queue.join() self._worker.join() + @classmethod + def is_initialized(cls) -> bool: + return cls in Singleton._instances + + @staticmethod + def add_work(action_type: ActionType, *instances: REGISTERED_CLASSES): + """ + Manually inject work that failed to come in through the __call__ (aka Django signal) + Before the work is processed, we check if the connectionManager is initialized, because if it's not, we don't want to accidentally initialize it. + Be careful calling it, because it might result in a instance being processed twice. + """ + if not ConnectorManager.is_initialized(): + return + ConnectorManager()._add_work(action_type, *instances) + @staticmethod def worker(worker_id: int, worker_queue: queue.Queue): logger = logging.getLogger("recipes.connector.worker") diff --git a/cookbook/views/api.py b/cookbook/views/api.py index 77745ac46..4e725107a 100644 --- a/cookbook/views/api.py +++ b/cookbook/views/api.py @@ -61,6 +61,7 @@ from rest_framework.viewsets import ViewSetMixin from rest_framework.serializers import CharField, IntegerField, UUIDField from treebeard.exceptions import InvalidMoveToDescendant, InvalidPosition, PathOverflow +from cookbook.connectors.connector_manager import ConnectorManager, ActionType from cookbook.forms import ImportForm from cookbook.helper import recipe_url_import as helper from cookbook.helper.HelperFunctions import str2bool, validate_import_url @@ -1254,8 +1255,6 @@ class RecipeViewSet(LoggingMixin, viewsets.ModelViewSet): SLR = RecipeShoppingEditor(request.user, request.space, id=list_recipe, recipe=obj, mealplan=mealplan, servings=servings) - content = {'msg': _(f'{obj.name} was added to the shopping list.')} - http_status = status.HTTP_204_NO_CONTENT if servings and servings <= 0: result = SLR.delete() elif list_recipe: @@ -1392,6 +1391,7 @@ class ShoppingListRecipeViewSet(LoggingMixin, viewsets.ModelViewSet): ) ShoppingListEntry.objects.bulk_create(entries) + ConnectorManager.add_work(ActionType.CREATED, *entries) return Response(serializer.validated_data) else: return Response(serializer.errors, 400) From 8740bf3a832584cec34d6db443bcb77bdfb4d6aa Mon Sep 17 00:00:00 2001 From: Mikhail Epifanov Date: Mon, 19 May 2025 21:46:40 +0200 Subject: [PATCH 2/4] use DTO object, and cleanup some code which is no longer needed --- cookbook/connectors/connector.py | 47 ++++++++++++++++--- cookbook/connectors/connector_manager.py | 37 +++++++++------ cookbook/connectors/homeassistant.py | 33 +++++-------- .../tests/other/test_connector_manager.py | 2 +- 4 files changed, 78 insertions(+), 41 deletions(-) diff --git a/cookbook/connectors/connector.py b/cookbook/connectors/connector.py index 27e9408db..003b7cf39 100644 --- a/cookbook/connectors/connector.py +++ b/cookbook/connectors/connector.py @@ -1,6 +1,43 @@ from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional -from cookbook.models import ShoppingListEntry, Space, ConnectorConfig +from cookbook.models import ShoppingListEntry, User, ConnectorConfig + + +@dataclass +class UserDTO: + username: str + first_name: Optional[str] + + @staticmethod + def try_create_from_user(instance: Optional[User]) -> 'UserDTO': + return UserDTO( + username=instance.username, + first_name=instance.first_name if instance.first_name and len(instance.first_name) > 0 else None + ) + + +@dataclass +class ShoppingListEntryDTO: + food_name: str + amount: Optional[float] + base_unit: Optional[str] + unit_name: Optional[str] + created_by: UserDTO + + @staticmethod + def try_create_from_entry(instance: ShoppingListEntry) -> Optional['ShoppingListEntryDTO']: + if instance.food is None or instance.created_by is None: + return None + + return ShoppingListEntryDTO( + food_name=instance.food.name, + amount=instance.amount if instance.amount and instance.amount > 0 else None, + unit_name=instance.unit.name if instance.unit else None, + base_unit=instance.unit.base_unit if instance.unit and instance.unit.base_unit and len(instance.unit.base_unit) > 0 else None, + created_by=UserDTO.try_create_from_user(instance.created_by), + ) # A Connector is 'destroyed' & recreated each time 'any' ConnectorConfig in a space changes. @@ -10,20 +47,18 @@ class Connector(ABC): pass @abstractmethod - async def on_shopping_list_entry_created(self, space: Space, instance: ShoppingListEntry) -> None: + async def on_shopping_list_entry_created(self, instance: ShoppingListEntryDTO) -> None: pass # This method might not trigger on 'direct' entry updates: https://stackoverflow.com/a/35238823 @abstractmethod - async def on_shopping_list_entry_updated(self, space: Space, instance: ShoppingListEntry) -> None: + async def on_shopping_list_entry_updated(self, instance: ShoppingListEntryDTO) -> None: pass @abstractmethod - async def on_shopping_list_entry_deleted(self, space: Space, instance: ShoppingListEntry) -> None: + async def on_shopping_list_entry_deleted(self, instance: ShoppingListEntryDTO) -> None: pass @abstractmethod async def close(self) -> None: pass - - # TODO: Add Recipes & possibly Meal Place listeners/hooks (And maybe more?) diff --git a/cookbook/connectors/connector_manager.py b/cookbook/connectors/connector_manager.py index 6b6b8d681..47dfbf3a4 100644 --- a/cookbook/connectors/connector_manager.py +++ b/cookbook/connectors/connector_manager.py @@ -12,7 +12,7 @@ from typing import List, Any, Dict, Optional, Type from django.conf import settings from django_scopes import scope -from cookbook.connectors.connector import Connector +from cookbook.connectors.connector import Connector, ShoppingListEntryDTO from cookbook.connectors.homeassistant import HomeAssistant from cookbook.models import ShoppingListEntry, Space, ConnectorConfig @@ -63,9 +63,6 @@ class ConnectorManager(metaclass=Singleton): # Called by post save & post delete signals def __call__(self, instance: Any, **kwargs) -> None: - if not isinstance(instance, self._listening_to_classes) or not hasattr(instance, "space"): - return - action_type: ActionType if "created" in kwargs and kwargs["created"]: action_type = ActionType.CREATED @@ -80,7 +77,10 @@ class ConnectorManager(metaclass=Singleton): def _add_work(self, action_type: ActionType, *instances: REGISTERED_CLASSES): for instance in instances: + if not isinstance(instance, self._listening_to_classes) or not hasattr(instance, "space"): + continue try: + _force_load_instance(instance) self._queue.put_nowait(Work(instance, action_type)) except queue.Full: self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}") @@ -90,8 +90,8 @@ class ConnectorManager(metaclass=Singleton): self._worker.join() @classmethod - def is_initialized(cls) -> bool: - return cls in Singleton._instances + def is_initialized(cls): + return cls in cls._instances @staticmethod def add_work(action_type: ActionType, *instances: REGISTERED_CLASSES): @@ -135,7 +135,7 @@ class ConnectorManager(metaclass=Singleton): if connectors is None or refresh_connector_cache: if connectors is not None: - loop.run_until_complete(close_connectors(connectors)) + loop.run_until_complete(_close_connectors(connectors)) with scope(space=space): connectors: List[Connector] = list() @@ -161,7 +161,7 @@ class ConnectorManager(metaclass=Singleton): logger.debug(f"running {len(connectors)} connectors for {item.instance=} with {item.actionType=}") - loop.run_until_complete(run_connectors(connectors, space, item.instance, item.actionType)) + loop.run_until_complete(run_connectors(connectors, item.instance, item.actionType)) worker_queue.task_done() logger.info(f"terminating ConnectionManager worker {worker_id}") @@ -178,7 +178,14 @@ class ConnectorManager(metaclass=Singleton): return None -async def close_connectors(connectors: List[Connector]): +def _force_load_instance(instance: REGISTERED_CLASSES): + if isinstance(instance, ShoppingListEntry): + _ = instance.food # Force load food + _ = instance.unit # Force load unit + _ = instance.created_by # Force load created_by + + +async def _close_connectors(connectors: List[Connector]): tasks: List[Task] = [asyncio.create_task(connector.close()) for connector in connectors] if len(tasks) == 0: @@ -190,22 +197,24 @@ async def close_connectors(connectors: List[Connector]): logging.exception("received an exception while closing one of the connectors") -async def run_connectors(connectors: List[Connector], space: Space, instance: REGISTERED_CLASSES, action_type: ActionType): +async def run_connectors(connectors: List[Connector], instance: REGISTERED_CLASSES, action_type: ActionType): tasks: List[Task] = list() if isinstance(instance, ShoppingListEntry): - shopping_list_entry: ShoppingListEntry = instance + shopping_list_entry = ShoppingListEntryDTO.try_create_from_entry(instance) + if shopping_list_entry is None: + return match action_type: case ActionType.CREATED: for connector in connectors: - tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(space, shopping_list_entry))) + tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(shopping_list_entry))) case ActionType.UPDATED: for connector in connectors: - tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(space, shopping_list_entry))) + tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(shopping_list_entry))) case ActionType.DELETED: for connector in connectors: - tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(space, shopping_list_entry))) + tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(shopping_list_entry))) if len(tasks) == 0: return diff --git a/cookbook/connectors/homeassistant.py b/cookbook/connectors/homeassistant.py index ba2b4a227..43c53fff4 100644 --- a/cookbook/connectors/homeassistant.py +++ b/cookbook/connectors/homeassistant.py @@ -5,16 +5,14 @@ from urllib.parse import urljoin from aiohttp import request, ClientResponseError -from cookbook.connectors.connector import Connector -from cookbook.models import ShoppingListEntry, ConnectorConfig, Space +from cookbook.connectors.connector import Connector, ShoppingListEntryDTO +from cookbook.models import ConnectorConfig class HomeAssistant(Connector): _config: ConnectorConfig _logger: Logger - _required_foreign_keys = ("food", "unit", "created_by") - def __init__(self, config: ConnectorConfig): if not config.token or not config.url or not config.todo_entity: raise ValueError("config for HomeAssistantConnector in incomplete") @@ -34,7 +32,7 @@ class HomeAssistant(Connector): response.raise_for_status() return await response.json() - async def on_shopping_list_entry_created(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: + async def on_shopping_list_entry_created(self, shopping_list_entry: ShoppingListEntryDTO) -> None: if not self._config.on_shopping_list_entry_created_enabled: return @@ -55,20 +53,15 @@ class HomeAssistant(Connector): except ClientResponseError as err: self._logger.warning(f"received an exception from the api: {err.request_info.url=}, {err.request_info.method=}, {err.status=}, {err.message=}, {type(err)=}") - async def on_shopping_list_entry_updated(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: + async def on_shopping_list_entry_updated(self, shopping_list_entry: ShoppingListEntryDTO) -> None: if not self._config.on_shopping_list_entry_updated_enabled: return pass - async def on_shopping_list_entry_deleted(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: + async def on_shopping_list_entry_deleted(self, shopping_list_entry: ShoppingListEntryDTO) -> None: if not self._config.on_shopping_list_entry_deleted_enabled: return - if not all(k in shopping_list_entry._state.fields_cache for k in self._required_foreign_keys): - # Sometimes the food foreign key is not loaded, and we cant load it from an async process - self._logger.debug("required property was not present in ShoppingListEntry") - return - item, _ = _format_shopping_list_entry(shopping_list_entry) self._logger.debug(f"removing {item=} from {self._config.todo_entity}") @@ -88,19 +81,19 @@ class HomeAssistant(Connector): pass -def _format_shopping_list_entry(shopping_list_entry: ShoppingListEntry) -> Tuple[str, str]: - item = shopping_list_entry.food.name - if shopping_list_entry.amount > 0: +def _format_shopping_list_entry(shopping_list_entry: ShoppingListEntryDTO) -> Tuple[str, str]: + item = shopping_list_entry.food_name + if shopping_list_entry.amount: item += f" ({shopping_list_entry.amount:.2f}".rstrip('0').rstrip('.') - if shopping_list_entry.unit and shopping_list_entry.unit.base_unit and len(shopping_list_entry.unit.base_unit) > 0: - item += f" {shopping_list_entry.unit.base_unit})" - elif shopping_list_entry.unit and shopping_list_entry.unit.name and len(shopping_list_entry.unit.name) > 0: - item += f" {shopping_list_entry.unit.name})" + if shopping_list_entry.base_unit: + item += f" {shopping_list_entry.base_unit})" + elif shopping_list_entry.unit_name: + item += f" {shopping_list_entry.unit_name})" else: item += ")" description = "From TandoorRecipes" - if shopping_list_entry.created_by.first_name and len(shopping_list_entry.created_by.first_name) > 0: + if shopping_list_entry.created_by.first_name: description += f", by {shopping_list_entry.created_by.first_name}" else: description += f", by {shopping_list_entry.created_by.username}" diff --git a/cookbook/tests/other/test_connector_manager.py b/cookbook/tests/other/test_connector_manager.py index d7243f9af..fefb960cc 100644 --- a/cookbook/tests/other/test_connector_manager.py +++ b/cookbook/tests/other/test_connector_manager.py @@ -18,7 +18,7 @@ def obj_1(space_1, u1_s1): async def test_run_connectors(space_1, u1_s1, obj_1) -> None: connector_mock = Mock(spec=Connector) - await run_connectors([connector_mock], space_1, obj_1, ActionType.DELETED) + await run_connectors([connector_mock], obj_1, ActionType.DELETED) assert not connector_mock.on_shopping_list_entry_updated.called assert not connector_mock.on_shopping_list_entry_created.called From b2da40421b089d3013f1f64115dbb6afba584e1a Mon Sep 17 00:00:00 2001 From: Mikhail Epifanov Date: Mon, 19 May 2025 22:35:17 +0200 Subject: [PATCH 3/4] fix test --- cookbook/tests/other/test_connector_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cookbook/tests/other/test_connector_manager.py b/cookbook/tests/other/test_connector_manager.py index fefb960cc..f82097513 100644 --- a/cookbook/tests/other/test_connector_manager.py +++ b/cookbook/tests/other/test_connector_manager.py @@ -2,7 +2,7 @@ import pytest from django.contrib import auth from mock.mock import Mock -from cookbook.connectors.connector import Connector +from cookbook.connectors.connector import Connector, ShoppingListEntryDTO from cookbook.connectors.connector_manager import ActionType, run_connectors from cookbook.models import Food, ShoppingListEntry @@ -13,13 +13,13 @@ def obj_1(space_1, u1_s1): return e -@pytest.mark.timeout(10) # TODO this mark doesn't exist @pytest.mark.asyncio async def test_run_connectors(space_1, u1_s1, obj_1) -> None: + expected_dto = ShoppingListEntryDTO.try_create_from_entry(obj_1) connector_mock = Mock(spec=Connector) await run_connectors([connector_mock], obj_1, ActionType.DELETED) assert not connector_mock.on_shopping_list_entry_updated.called assert not connector_mock.on_shopping_list_entry_created.called - connector_mock.on_shopping_list_entry_deleted.assert_called_once_with(space_1, obj_1) + connector_mock.on_shopping_list_entry_deleted.assert_called_once_with(expected_dto) From fc0b12af12ea9f4c585c6751923caef7cadfa877 Mon Sep 17 00:00:00 2001 From: Mikhail Epifanov Date: Mon, 19 May 2025 23:11:39 +0200 Subject: [PATCH 4/4] remove redundant len, and fix optional --- cookbook/connectors/connector.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cookbook/connectors/connector.py b/cookbook/connectors/connector.py index 003b7cf39..3267f5d02 100644 --- a/cookbook/connectors/connector.py +++ b/cookbook/connectors/connector.py @@ -11,10 +11,10 @@ class UserDTO: first_name: Optional[str] @staticmethod - def try_create_from_user(instance: Optional[User]) -> 'UserDTO': + def create_from_user(instance: User) -> 'UserDTO': return UserDTO( username=instance.username, - first_name=instance.first_name if instance.first_name and len(instance.first_name) > 0 else None + first_name=instance.first_name if instance.first_name else None ) @@ -33,10 +33,10 @@ class ShoppingListEntryDTO: return ShoppingListEntryDTO( food_name=instance.food.name, - amount=instance.amount if instance.amount and instance.amount > 0 else None, + amount=instance.amount if instance.amount else None, unit_name=instance.unit.name if instance.unit else None, - base_unit=instance.unit.base_unit if instance.unit and instance.unit.base_unit and len(instance.unit.base_unit) > 0 else None, - created_by=UserDTO.try_create_from_user(instance.created_by), + base_unit=instance.unit.base_unit if instance.unit and instance.unit.base_unit else None, + created_by=UserDTO.create_from_user(instance.created_by), )