Merge pull request #3724 from mikhail5555/feature/connector_manager_bulk_insert

Feature/connector manager bulk insert
This commit is contained in:
vabene1111
2025-05-28 17:54:08 +02:00
committed by GitHub
5 changed files with 105 additions and 49 deletions

View File

@@ -1,6 +1,43 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from cookbook.models import ShoppingListEntry, Space, ConnectorConfig from cookbook.models import ShoppingListEntry, User, ConnectorConfig
@dataclass
class UserDTO:
username: str
first_name: Optional[str]
@staticmethod
def create_from_user(instance: User) -> 'UserDTO':
return UserDTO(
username=instance.username,
first_name=instance.first_name if instance.first_name else None
)
@dataclass
class ShoppingListEntryDTO:
food_name: str
amount: Optional[float]
base_unit: Optional[str]
unit_name: Optional[str]
created_by: UserDTO
@staticmethod
def try_create_from_entry(instance: ShoppingListEntry) -> Optional['ShoppingListEntryDTO']:
if instance.food is None or instance.created_by is None:
return None
return ShoppingListEntryDTO(
food_name=instance.food.name,
amount=instance.amount if instance.amount else None,
unit_name=instance.unit.name if instance.unit else None,
base_unit=instance.unit.base_unit if instance.unit and instance.unit.base_unit else None,
created_by=UserDTO.create_from_user(instance.created_by),
)
# A Connector is 'destroyed' & recreated each time 'any' ConnectorConfig in a space changes. # A Connector is 'destroyed' & recreated each time 'any' ConnectorConfig in a space changes.
@@ -10,20 +47,18 @@ class Connector(ABC):
pass pass
@abstractmethod @abstractmethod
async def on_shopping_list_entry_created(self, space: Space, instance: ShoppingListEntry) -> None: async def on_shopping_list_entry_created(self, instance: ShoppingListEntryDTO) -> None:
pass pass
# This method might not trigger on 'direct' entry updates: https://stackoverflow.com/a/35238823 # This method might not trigger on 'direct' entry updates: https://stackoverflow.com/a/35238823
@abstractmethod @abstractmethod
async def on_shopping_list_entry_updated(self, space: Space, instance: ShoppingListEntry) -> None: async def on_shopping_list_entry_updated(self, instance: ShoppingListEntryDTO) -> None:
pass pass
@abstractmethod @abstractmethod
async def on_shopping_list_entry_deleted(self, space: Space, instance: ShoppingListEntry) -> None: async def on_shopping_list_entry_deleted(self, instance: ShoppingListEntryDTO) -> None:
pass pass
@abstractmethod @abstractmethod
async def close(self) -> None: async def close(self) -> None:
pass pass
# TODO: Add Recipes & possibly Meal Place listeners/hooks (And maybe more?)

View File

@@ -12,7 +12,7 @@ from typing import List, Any, Dict, Optional, Type
from django.conf import settings from django.conf import settings
from django_scopes import scope from django_scopes import scope
from cookbook.connectors.connector import Connector from cookbook.connectors.connector import Connector, ShoppingListEntryDTO
from cookbook.connectors.homeassistant import HomeAssistant from cookbook.connectors.homeassistant import HomeAssistant
from cookbook.models import ShoppingListEntry, Space, ConnectorConfig from cookbook.models import ShoppingListEntry, Space, ConnectorConfig
@@ -56,15 +56,13 @@ class ConnectorManager(metaclass=Singleton):
def __init__(self): def __init__(self):
self._logger = logging.getLogger("recipes.connector") self._logger = logging.getLogger("recipes.connector")
self._logger.debug("ConnectorManager initializing")
self._queue = queue.Queue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) self._queue = queue.Queue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE)
self._worker = threading.Thread(target=self.worker, args=(0, self._queue,), daemon=True) self._worker = threading.Thread(target=self.worker, args=(0, self._queue,), daemon=True)
self._worker.start() self._worker.start()
# Called by post save & post delete signals # Called by post save & post delete signals
def __call__(self, instance: Any, **kwargs) -> None: def __call__(self, instance: Any, **kwargs) -> None:
if not isinstance(instance, self._listening_to_classes) or not hasattr(instance, "space"):
return
action_type: ActionType action_type: ActionType
if "created" in kwargs and kwargs["created"]: if "created" in kwargs and kwargs["created"]:
action_type = ActionType.CREATED action_type = ActionType.CREATED
@@ -75,16 +73,37 @@ class ConnectorManager(metaclass=Singleton):
else: else:
return return
try: self._add_work(action_type, instance)
self._queue.put_nowait(Work(instance, action_type))
except queue.Full: def _add_work(self, action_type: ActionType, *instances: REGISTERED_CLASSES):
self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}") for instance in instances:
return if not isinstance(instance, self._listening_to_classes) or not hasattr(instance, "space"):
continue
try:
_force_load_instance(instance)
self._queue.put_nowait(Work(instance, action_type))
except queue.Full:
self._logger.info(f"queue was full, so skipping {action_type} of type {type(instance)}")
def stop(self): def stop(self):
self._queue.join() self._queue.join()
self._worker.join() self._worker.join()
@classmethod
def is_initialized(cls):
return cls in cls._instances
@staticmethod
def add_work(action_type: ActionType, *instances: REGISTERED_CLASSES):
"""
Manually inject work that failed to come in through the __call__ (aka Django signal)
Before the work is processed, we check if the connectionManager is initialized, because if it's not, we don't want to accidentally initialize it.
Be careful calling it, because it might result in a instance being processed twice.
"""
if not ConnectorManager.is_initialized():
return
ConnectorManager()._add_work(action_type, *instances)
@staticmethod @staticmethod
def worker(worker_id: int, worker_queue: queue.Queue): def worker(worker_id: int, worker_queue: queue.Queue):
logger = logging.getLogger("recipes.connector.worker") logger = logging.getLogger("recipes.connector.worker")
@@ -116,7 +135,7 @@ class ConnectorManager(metaclass=Singleton):
if connectors is None or refresh_connector_cache: if connectors is None or refresh_connector_cache:
if connectors is not None: if connectors is not None:
loop.run_until_complete(close_connectors(connectors)) loop.run_until_complete(_close_connectors(connectors))
with scope(space=space): with scope(space=space):
connectors: List[Connector] = list() connectors: List[Connector] = list()
@@ -142,7 +161,7 @@ class ConnectorManager(metaclass=Singleton):
logger.debug(f"running {len(connectors)} connectors for {item.instance=} with {item.actionType=}") logger.debug(f"running {len(connectors)} connectors for {item.instance=} with {item.actionType=}")
loop.run_until_complete(run_connectors(connectors, space, item.instance, item.actionType)) loop.run_until_complete(run_connectors(connectors, item.instance, item.actionType))
worker_queue.task_done() worker_queue.task_done()
logger.info(f"terminating ConnectionManager worker {worker_id}") logger.info(f"terminating ConnectionManager worker {worker_id}")
@@ -159,7 +178,14 @@ class ConnectorManager(metaclass=Singleton):
return None return None
async def close_connectors(connectors: List[Connector]): def _force_load_instance(instance: REGISTERED_CLASSES):
if isinstance(instance, ShoppingListEntry):
_ = instance.food # Force load food
_ = instance.unit # Force load unit
_ = instance.created_by # Force load created_by
async def _close_connectors(connectors: List[Connector]):
tasks: List[Task] = [asyncio.create_task(connector.close()) for connector in connectors] tasks: List[Task] = [asyncio.create_task(connector.close()) for connector in connectors]
if len(tasks) == 0: if len(tasks) == 0:
@@ -171,22 +197,24 @@ async def close_connectors(connectors: List[Connector]):
logging.exception("received an exception while closing one of the connectors") logging.exception("received an exception while closing one of the connectors")
async def run_connectors(connectors: List[Connector], space: Space, instance: REGISTERED_CLASSES, action_type: ActionType): async def run_connectors(connectors: List[Connector], instance: REGISTERED_CLASSES, action_type: ActionType):
tasks: List[Task] = list() tasks: List[Task] = list()
if isinstance(instance, ShoppingListEntry): if isinstance(instance, ShoppingListEntry):
shopping_list_entry: ShoppingListEntry = instance shopping_list_entry = ShoppingListEntryDTO.try_create_from_entry(instance)
if shopping_list_entry is None:
return
match action_type: match action_type:
case ActionType.CREATED: case ActionType.CREATED:
for connector in connectors: for connector in connectors:
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(space, shopping_list_entry))) tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(shopping_list_entry)))
case ActionType.UPDATED: case ActionType.UPDATED:
for connector in connectors: for connector in connectors:
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(space, shopping_list_entry))) tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(shopping_list_entry)))
case ActionType.DELETED: case ActionType.DELETED:
for connector in connectors: for connector in connectors:
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(space, shopping_list_entry))) tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(shopping_list_entry)))
if len(tasks) == 0: if len(tasks) == 0:
return return

View File

@@ -5,16 +5,14 @@ from urllib.parse import urljoin
from aiohttp import request, ClientResponseError from aiohttp import request, ClientResponseError
from cookbook.connectors.connector import Connector from cookbook.connectors.connector import Connector, ShoppingListEntryDTO
from cookbook.models import ShoppingListEntry, ConnectorConfig, Space from cookbook.models import ConnectorConfig
class HomeAssistant(Connector): class HomeAssistant(Connector):
_config: ConnectorConfig _config: ConnectorConfig
_logger: Logger _logger: Logger
_required_foreign_keys = ("food", "unit", "created_by")
def __init__(self, config: ConnectorConfig): def __init__(self, config: ConnectorConfig):
if not config.token or not config.url or not config.todo_entity: if not config.token or not config.url or not config.todo_entity:
raise ValueError("config for HomeAssistantConnector in incomplete") raise ValueError("config for HomeAssistantConnector in incomplete")
@@ -34,7 +32,7 @@ class HomeAssistant(Connector):
response.raise_for_status() response.raise_for_status()
return await response.json() return await response.json()
async def on_shopping_list_entry_created(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: async def on_shopping_list_entry_created(self, shopping_list_entry: ShoppingListEntryDTO) -> None:
if not self._config.on_shopping_list_entry_created_enabled: if not self._config.on_shopping_list_entry_created_enabled:
return return
@@ -55,20 +53,15 @@ class HomeAssistant(Connector):
except ClientResponseError as err: except ClientResponseError as err:
self._logger.warning(f"received an exception from the api: {err.request_info.url=}, {err.request_info.method=}, {err.status=}, {err.message=}, {type(err)=}") self._logger.warning(f"received an exception from the api: {err.request_info.url=}, {err.request_info.method=}, {err.status=}, {err.message=}, {type(err)=}")
async def on_shopping_list_entry_updated(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: async def on_shopping_list_entry_updated(self, shopping_list_entry: ShoppingListEntryDTO) -> None:
if not self._config.on_shopping_list_entry_updated_enabled: if not self._config.on_shopping_list_entry_updated_enabled:
return return
pass pass
async def on_shopping_list_entry_deleted(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: async def on_shopping_list_entry_deleted(self, shopping_list_entry: ShoppingListEntryDTO) -> None:
if not self._config.on_shopping_list_entry_deleted_enabled: if not self._config.on_shopping_list_entry_deleted_enabled:
return return
if not all(k in shopping_list_entry._state.fields_cache for k in self._required_foreign_keys):
# Sometimes the food foreign key is not loaded, and we cant load it from an async process
self._logger.debug("required property was not present in ShoppingListEntry")
return
item, _ = _format_shopping_list_entry(shopping_list_entry) item, _ = _format_shopping_list_entry(shopping_list_entry)
self._logger.debug(f"removing {item=} from {self._config.todo_entity}") self._logger.debug(f"removing {item=} from {self._config.todo_entity}")
@@ -88,19 +81,19 @@ class HomeAssistant(Connector):
pass pass
def _format_shopping_list_entry(shopping_list_entry: ShoppingListEntry) -> Tuple[str, str]: def _format_shopping_list_entry(shopping_list_entry: ShoppingListEntryDTO) -> Tuple[str, str]:
item = shopping_list_entry.food.name item = shopping_list_entry.food_name
if shopping_list_entry.amount > 0: if shopping_list_entry.amount:
item += f" ({shopping_list_entry.amount:.2f}".rstrip('0').rstrip('.') item += f" ({shopping_list_entry.amount:.2f}".rstrip('0').rstrip('.')
if shopping_list_entry.unit and shopping_list_entry.unit.base_unit and len(shopping_list_entry.unit.base_unit) > 0: if shopping_list_entry.base_unit:
item += f" {shopping_list_entry.unit.base_unit})" item += f" {shopping_list_entry.base_unit})"
elif shopping_list_entry.unit and shopping_list_entry.unit.name and len(shopping_list_entry.unit.name) > 0: elif shopping_list_entry.unit_name:
item += f" {shopping_list_entry.unit.name})" item += f" {shopping_list_entry.unit_name})"
else: else:
item += ")" item += ")"
description = "From TandoorRecipes" description = "From TandoorRecipes"
if shopping_list_entry.created_by.first_name and len(shopping_list_entry.created_by.first_name) > 0: if shopping_list_entry.created_by.first_name:
description += f", by {shopping_list_entry.created_by.first_name}" description += f", by {shopping_list_entry.created_by.first_name}"
else: else:
description += f", by {shopping_list_entry.created_by.username}" description += f", by {shopping_list_entry.created_by.username}"

View File

@@ -2,7 +2,7 @@ import pytest
from django.contrib import auth from django.contrib import auth
from mock.mock import Mock from mock.mock import Mock
from cookbook.connectors.connector import Connector from cookbook.connectors.connector import Connector, ShoppingListEntryDTO
from cookbook.connectors.connector_manager import ActionType, run_connectors from cookbook.connectors.connector_manager import ActionType, run_connectors
from cookbook.models import Food, ShoppingListEntry from cookbook.models import Food, ShoppingListEntry
@@ -13,13 +13,13 @@ def obj_1(space_1, u1_s1):
return e return e
@pytest.mark.timeout(10) # TODO this mark doesn't exist
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_connectors(space_1, u1_s1, obj_1) -> None: async def test_run_connectors(space_1, u1_s1, obj_1) -> None:
expected_dto = ShoppingListEntryDTO.try_create_from_entry(obj_1)
connector_mock = Mock(spec=Connector) connector_mock = Mock(spec=Connector)
await run_connectors([connector_mock], space_1, obj_1, ActionType.DELETED) await run_connectors([connector_mock], obj_1, ActionType.DELETED)
assert not connector_mock.on_shopping_list_entry_updated.called assert not connector_mock.on_shopping_list_entry_updated.called
assert not connector_mock.on_shopping_list_entry_created.called assert not connector_mock.on_shopping_list_entry_created.called
connector_mock.on_shopping_list_entry_deleted.assert_called_once_with(space_1, obj_1) connector_mock.on_shopping_list_entry_deleted.assert_called_once_with(expected_dto)

View File

@@ -61,6 +61,7 @@ from rest_framework.viewsets import ViewSetMixin
from rest_framework.serializers import CharField, IntegerField, UUIDField from rest_framework.serializers import CharField, IntegerField, UUIDField
from treebeard.exceptions import InvalidMoveToDescendant, InvalidPosition, PathOverflow from treebeard.exceptions import InvalidMoveToDescendant, InvalidPosition, PathOverflow
from cookbook.connectors.connector_manager import ConnectorManager, ActionType
from cookbook.forms import ImportForm from cookbook.forms import ImportForm
from cookbook.helper import recipe_url_import as helper from cookbook.helper import recipe_url_import as helper
from cookbook.helper.HelperFunctions import str2bool, validate_import_url from cookbook.helper.HelperFunctions import str2bool, validate_import_url
@@ -1254,8 +1255,6 @@ class RecipeViewSet(LoggingMixin, viewsets.ModelViewSet):
SLR = RecipeShoppingEditor(request.user, request.space, id=list_recipe, recipe=obj, mealplan=mealplan, SLR = RecipeShoppingEditor(request.user, request.space, id=list_recipe, recipe=obj, mealplan=mealplan,
servings=servings) servings=servings)
content = {'msg': _(f'{obj.name} was added to the shopping list.')}
http_status = status.HTTP_204_NO_CONTENT
if servings and servings <= 0: if servings and servings <= 0:
result = SLR.delete() result = SLR.delete()
elif list_recipe: elif list_recipe:
@@ -1392,6 +1391,7 @@ class ShoppingListRecipeViewSet(LoggingMixin, viewsets.ModelViewSet):
) )
ShoppingListEntry.objects.bulk_create(entries) ShoppingListEntry.objects.bulk_create(entries)
ConnectorManager.add_work(ActionType.CREATED, *entries)
return Response(serializer.validated_data) return Response(serializer.validated_data)
else: else:
return Response(serializer.errors, 400) return Response(serializer.errors, 400)