mirror of
https://github.com/TandoorRecipes/recipes.git
synced 2025-12-24 02:39:20 -05:00
2277 lines
108 KiB
Python
2277 lines
108 KiB
Python
import base64
|
|
import datetime
|
|
import io
|
|
import json
|
|
import mimetypes
|
|
import pathlib
|
|
import re
|
|
import threading
|
|
import traceback
|
|
import uuid
|
|
from collections import OrderedDict
|
|
from json import JSONDecodeError
|
|
from urllib.parse import unquote
|
|
from zipfile import ZipFile
|
|
|
|
import PIL.Image
|
|
import litellm
|
|
import redis
|
|
import requests
|
|
from PIL import UnidentifiedImageError
|
|
from PIL.ImImagePlugin import number
|
|
from PIL.features import check
|
|
from django.contrib import messages
|
|
from django.contrib.auth.models import Group, User
|
|
from django.contrib.postgres.search import TrigramSimilarity
|
|
from django.core.cache import caches
|
|
from django.core.exceptions import FieldError, ValidationError
|
|
from django.core.files import File
|
|
from django.db.models import Case, Count, Exists, OuterRef, ProtectedError, Q, Subquery, Value, When
|
|
from django.db.models.fields.related import ForeignObjectRel
|
|
from django.db.models.functions import Coalesce, Lower
|
|
from django.db.models.signals import post_save
|
|
from django.http import FileResponse, HttpResponse, JsonResponse
|
|
from django.shortcuts import get_object_or_404, redirect
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.dateparse import parse_datetime
|
|
from django.utils.datetime_safe import date
|
|
from django.utils.translation import gettext as _
|
|
from django_scopes import scopes_disabled
|
|
from drf_spectacular.types import OpenApiTypes
|
|
from drf_spectacular.utils import OpenApiParameter, extend_schema, extend_schema_view, OpenApiExample, inline_serializer
|
|
from icalendar import Calendar, Event
|
|
from litellm import completion, BadRequestError
|
|
from oauth2_provider.models import AccessToken
|
|
from recipe_scrapers import scrape_html
|
|
from recipe_scrapers._exceptions import NoSchemaFoundInWildMode
|
|
from requests.exceptions import MissingSchema
|
|
from rest_framework import decorators, status, viewsets
|
|
from rest_framework.authtoken.views import ObtainAuthToken
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.exceptions import APIException, PermissionDenied
|
|
from rest_framework.pagination import PageNumberPagination
|
|
from rest_framework.parsers import MultiPartParser
|
|
from rest_framework.renderers import JSONRenderer, TemplateHTMLRenderer
|
|
from rest_framework.response import Response
|
|
from rest_framework.throttling import AnonRateThrottle, UserRateThrottle
|
|
from rest_framework.views import APIView
|
|
from rest_framework import mixins
|
|
from rest_framework.viewsets import ViewSetMixin
|
|
from rest_framework.serializers import CharField, IntegerField, UUIDField
|
|
from treebeard.exceptions import InvalidMoveToDescendant, InvalidPosition, PathOverflow
|
|
|
|
from cookbook.forms import ImportForm
|
|
from cookbook.helper import recipe_url_import as helper
|
|
from cookbook.helper.HelperFunctions import str2bool, validate_import_url
|
|
from cookbook.helper.image_processing import handle_image
|
|
from cookbook.helper.ingredient_parser import IngredientParser
|
|
from cookbook.helper.open_data_importer import OpenDataImporter
|
|
from cookbook.helper.permission_helper import (CustomIsAdmin, CustomIsOwner, CustomIsOwnerReadOnly, CustomIsShared,
|
|
CustomIsSpaceOwner, CustomIsUser, CustomIsGuest,
|
|
CustomRecipePermission, CustomTokenHasReadWriteScope,
|
|
CustomTokenHasScope, CustomUserPermission, IsReadOnlyDRF,
|
|
above_space_limit,
|
|
group_required, has_group_permission, is_space_owner,
|
|
switch_user_active_space
|
|
)
|
|
from cookbook.helper.recipe_search import RecipeSearch
|
|
from cookbook.helper.recipe_url_import import clean_dict, get_from_youtube_scraper, get_images_from_soup
|
|
from cookbook.helper.shopping_helper import RecipeShoppingEditor, shopping_helper
|
|
from cookbook.models import (Automation, BookmarkletImport, ConnectorConfig, CookLog, CustomFilter, ExportLog, Food,
|
|
FoodInheritField, FoodProperty, ImportLog, Ingredient,
|
|
InviteLink, Keyword, MealPlan, MealType, Property, PropertyType, Recipe, RecipeBook,
|
|
RecipeBookEntry, ShareLink, ShoppingListEntry,
|
|
ShoppingListRecipe, Space, Step, Storage, Supermarket, SupermarketCategory,
|
|
SupermarketCategoryRelation, Sync, SyncLog, Unit, UnitConversion,
|
|
UserFile, UserPreference, UserSpace, ViewLog
|
|
)
|
|
from cookbook.provider.dropbox import Dropbox
|
|
from cookbook.provider.local import Local
|
|
from cookbook.provider.nextcloud import Nextcloud
|
|
from cookbook.serializer import (AccessTokenSerializer, AutomationSerializer, AutoMealPlanSerializer,
|
|
BookmarkletImportListSerializer, BookmarkletImportSerializer,
|
|
ConnectorConfigConfigSerializer, CookLogSerializer, CustomFilterSerializer,
|
|
ExportLogSerializer, FoodInheritFieldSerializer, FoodSerializer,
|
|
FoodShoppingUpdateSerializer, FoodSimpleSerializer, GroupSerializer,
|
|
ImportLogSerializer, IngredientSerializer, IngredientSimpleSerializer,
|
|
InviteLinkSerializer, KeywordSerializer, MealPlanSerializer, MealTypeSerializer,
|
|
PropertySerializer, PropertyTypeSerializer,
|
|
RecipeBookEntrySerializer, RecipeBookSerializer, RecipeExportSerializer,
|
|
RecipeFlatSerializer, RecipeFromSourceSerializer, RecipeImageSerializer,
|
|
RecipeOverviewSerializer, RecipeSerializer, RecipeShoppingUpdateSerializer,
|
|
RecipeSimpleSerializer, ShoppingListEntryBulkSerializer,
|
|
ShoppingListEntrySerializer, ShoppingListRecipeSerializer, SpaceSerializer,
|
|
StepSerializer, StorageSerializer,
|
|
SupermarketCategoryRelationSerializer, SupermarketCategorySerializer,
|
|
SupermarketSerializer, SyncLogSerializer, SyncSerializer,
|
|
UnitConversionSerializer, UnitSerializer, UserFileSerializer, UserPreferenceSerializer,
|
|
UserSerializer, UserSpaceSerializer, ViewLogSerializer, ImportImageSerializer,
|
|
LocalizationSerializer, ServerSettingsSerializer, RecipeFromSourceResponseSerializer, ShoppingListEntryBulkCreateSerializer, FdcQuerySerializer
|
|
)
|
|
from cookbook.version_info import TANDOOR_VERSION
|
|
from cookbook.views.import_export import get_integration
|
|
from recipes import settings
|
|
from recipes.settings import DRF_THROTTLE_RECIPE_URL_IMPORT, FDC_API_KEY, AI_RATELIMIT, AI_API_KEY, AI_MODEL_NAME
|
|
|
|
DateExample = OpenApiExample('Date Format', value='1972-12-05', request_only=True)
|
|
BeforeDateExample = OpenApiExample('Before Date Format', value='-1972-12-05', request_only=True)
|
|
|
|
|
|
class LoggingMixin(object):
|
|
"""
|
|
logs request counts to redis cache total/per user/
|
|
"""
|
|
|
|
def initial(self, request, *args, **kwargs):
|
|
super(LoggingMixin, self).initial(request, *args, **kwargs)
|
|
|
|
if settings.REDIS_HOST:
|
|
try:
|
|
d = date.today().isoformat()
|
|
space = request.space
|
|
endpoint = request.resolver_match.url_name
|
|
|
|
r = redis.StrictRedis(
|
|
host=settings.REDIS_HOST,
|
|
port=settings.REDIS_PORT,
|
|
username=settings.REDIS_USERNAME,
|
|
password=settings.REDIS_PASSWORD,
|
|
db=settings.REDIS_DATABASES['STATS'],
|
|
)
|
|
|
|
pipe = r.pipeline()
|
|
|
|
# Global and daily tallies for all URLs.
|
|
pipe.incr('api:request-count')
|
|
pipe.incr(f'api:request-count:{d}')
|
|
|
|
# Use a sorted set to store the user stats, with the score representing
|
|
# the number of queries the user made total or on a given day.
|
|
pipe.zincrby(f'api:space-request-count', 1, space.pk)
|
|
pipe.zincrby(f'api:space-request-count:{d}', 1, space.pk)
|
|
|
|
# Use a sorted set to store all the endpoints with score representing
|
|
# the number of queries the endpoint received total or on a given day.
|
|
pipe.zincrby(f'api:endpoint-request-count', 1, endpoint)
|
|
pipe.zincrby(f'api:endpoint-request-count:{d}', 1, endpoint)
|
|
|
|
pipe.execute()
|
|
except:
|
|
pass
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='query', description='lookup if query string is contained within the name, case insensitive',
|
|
type=str),
|
|
OpenApiParameter(name='updated_at',
|
|
description='if model has an updated_at timestamp, filter only models updated at or after datetime',
|
|
type=str, examples=[DateExample]),
|
|
OpenApiParameter(name='limit', description='limit number of entries to return', type=str),
|
|
OpenApiParameter(name='random', description='randomly orders entries (only works together with limit)', type=str),
|
|
]))
|
|
class StandardFilterModelViewSet(viewsets.ModelViewSet):
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset
|
|
query = self.request.query_params.get('query', None)
|
|
if query is not None:
|
|
queryset = queryset.filter(name__icontains=query)
|
|
|
|
updated_at = self.request.query_params.get('updated_at', None)
|
|
if updated_at is not None:
|
|
try:
|
|
queryset = queryset.filter(updated_at__gte=updated_at)
|
|
except FieldError:
|
|
pass
|
|
except ValidationError:
|
|
raise APIException(_('Parameter updated_at incorrectly formatted'))
|
|
|
|
limit = self.request.query_params.get('limit', None)
|
|
random = self.request.query_params.get('random', False)
|
|
if limit is not None:
|
|
if random:
|
|
queryset = queryset.order_by("?")[:int(limit)]
|
|
else:
|
|
queryset = queryset[:int(limit)]
|
|
return queryset
|
|
|
|
|
|
class DefaultPagination(PageNumberPagination):
|
|
"""
|
|
Default pagination class to set the default and maximum page size
|
|
also annotates the current server timestamp to all results
|
|
"""
|
|
page_size = 50
|
|
page_size_query_param = 'page_size'
|
|
max_page_size = 200
|
|
|
|
def get_paginated_response(self, data):
|
|
return Response({
|
|
'count': self.page.paginator.count,
|
|
'next': self.get_next_link(),
|
|
'previous': self.get_previous_link(),
|
|
'timestamp': timezone.now().isoformat(),
|
|
'results': data,
|
|
})
|
|
|
|
def get_paginated_response_schema(self, schema):
|
|
schema = super().get_paginated_response_schema(schema)
|
|
schema['properties']['timestamp'] = {
|
|
'type': 'string',
|
|
'format': 'date-time',
|
|
}
|
|
return schema
|
|
|
|
|
|
class ExtendedRecipeMixin():
|
|
"""
|
|
ExtendedRecipe annotates a queryset with recipe_image and recipe_count values
|
|
"""
|
|
|
|
@classmethod
|
|
def annotate_recipe(self, queryset=None, request=None, serializer=None, tree=False):
|
|
extended = str2bool(request.query_params.get('extended', None))
|
|
recipe_filter = getattr(serializer, 'recipe_filter', None)
|
|
if extended and recipe_filter:
|
|
images = serializer.images
|
|
space = request.space
|
|
|
|
# add a recipe count annotation to the query
|
|
# explanation on construction https://stackoverflow.com/a/43771738/15762829
|
|
recipe_count = Recipe.objects.filter(**{recipe_filter: OuterRef('id')}, space=space).values(
|
|
recipe_filter).annotate(count=Count('pk', distinct=True)).values('count')
|
|
queryset = queryset.annotate(recipe_count=Coalesce(Subquery(recipe_count), 0))
|
|
|
|
# add a recipe image annotation to the query
|
|
image_subquery = Recipe.objects.filter(**{
|
|
recipe_filter: OuterRef('id')
|
|
}, space=space).exclude(image__isnull=True).exclude(image__exact='').order_by("?").values('image')[:1]
|
|
if tree:
|
|
image_children_subquery = Recipe.objects.filter(**{
|
|
f"{recipe_filter}__path__startswith": OuterRef('path')
|
|
}, space=space).exclude(image__isnull=True).exclude(image__exact='').order_by("?").values('image')[:1]
|
|
else:
|
|
image_children_subquery = None
|
|
if images:
|
|
queryset = queryset.annotate(recipe_image=Coalesce(*images, image_subquery, image_children_subquery))
|
|
else:
|
|
queryset = queryset.annotate(recipe_image=Coalesce(image_subquery, image_children_subquery))
|
|
return queryset
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='query', description='lookup if query string is contained within the name, case insensitive',
|
|
type=str),
|
|
OpenApiParameter(name='updated_at',
|
|
description='if model has an updated_at timestamp, filter only models updated at or after datetime',
|
|
type=str), # TODO format hint
|
|
OpenApiParameter(name='limit', description='limit number of entries to return', type=str),
|
|
OpenApiParameter(name='random', description='randomly orders entries (only works together with limit)', type=str),
|
|
]))
|
|
class FuzzyFilterMixin(viewsets.ModelViewSet, ExtendedRecipeMixin):
|
|
|
|
def get_queryset(self):
|
|
self.queryset = self.queryset.filter(space=self.request.space).order_by(Lower('name').asc())
|
|
query = self.request.query_params.get('query', None)
|
|
if self.request.user.is_authenticated:
|
|
fuzzy = self.request.user.searchpreference.lookup or any(
|
|
[self.model.__name__.lower() in x for x in
|
|
self.request.user.searchpreference.trigram.values_list('field', flat=True)])
|
|
else:
|
|
fuzzy = True
|
|
|
|
if query is not None and query not in ["''", '']:
|
|
if fuzzy and (settings.DATABASES['default']['ENGINE'] == 'django.db.backends.postgresql'):
|
|
if (
|
|
self.request.user.is_authenticated
|
|
and any([self.model.__name__.lower() in x for x in
|
|
self.request.user.searchpreference.unaccent.values_list('field', flat=True)])
|
|
):
|
|
self.queryset = self.queryset.annotate(trigram=TrigramSimilarity('name__unaccent', query))
|
|
else:
|
|
self.queryset = self.queryset.annotate(trigram=TrigramSimilarity('name', query))
|
|
self.queryset = self.queryset.order_by('-trigram')
|
|
else:
|
|
# TODO have this check unaccent search settings or other search preferences?
|
|
filter = Q(name__icontains=query)
|
|
if self.request.user.is_authenticated:
|
|
if any([self.model.__name__.lower() in x for x in
|
|
self.request.user.searchpreference.unaccent.values_list('field', flat=True)]):
|
|
filter |= Q(name__unaccent__icontains=query)
|
|
|
|
self.queryset = (
|
|
self.queryset.annotate(starts=Case(When(name__istartswith=query, then=(Value(100))), default=Value(
|
|
0))) # put exact matches at the top of the result set
|
|
.filter(filter).order_by('-starts',
|
|
Lower('name').asc()))
|
|
|
|
updated_at = self.request.query_params.get('updated_at', None)
|
|
if updated_at is not None:
|
|
try:
|
|
self.queryset = self.queryset.filter(updated_at__gte=updated_at)
|
|
except FieldError:
|
|
pass
|
|
except ValidationError:
|
|
raise APIException(_('Parameter updated_at incorrectly formatted'))
|
|
|
|
limit = self.request.query_params.get('limit', None)
|
|
random = self.request.query_params.get('random', False)
|
|
if random:
|
|
self.queryset = self.queryset.order_by("?")
|
|
if limit is not None:
|
|
self.queryset = self.queryset[:int(limit)]
|
|
return self.annotate_recipe(queryset=self.queryset, request=self.request, serializer=self.serializer_class)
|
|
|
|
|
|
class MergeMixin(ViewSetMixin):
|
|
|
|
@extend_schema(parameters=[
|
|
OpenApiParameter(name="target", description='The ID of the {obj} you want to merge with.',
|
|
type=OpenApiTypes.INT, location=OpenApiParameter.PATH)
|
|
])
|
|
@decorators.action(detail=True, url_path='merge/(?P<target>[^/.]+)', methods=['PUT'], )
|
|
@decorators.renderer_classes((TemplateHTMLRenderer, JSONRenderer))
|
|
def merge(self, request, pk, target: int):
|
|
self.description = f"Merge {self.basename} onto target {self.basename} with ID of [int]."
|
|
|
|
try:
|
|
source = self.model.objects.get(pk=pk, space=self.request.space)
|
|
except (self.model.DoesNotExist):
|
|
content = {'error': True, 'msg': _(f'No {self.basename} with id {pk} exists')}
|
|
return Response(content, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
if int(target) == source.id:
|
|
content = {'error': True, 'msg': _('Cannot merge with the same object!')}
|
|
return Response(content, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
else:
|
|
try:
|
|
target = self.model.objects.get(pk=target, space=self.request.space)
|
|
except (self.model.DoesNotExist):
|
|
content = {'error': True, 'msg': _(f'No {self.basename} with id {target} exists')}
|
|
return Response(content, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
try:
|
|
if target in source.get_descendants_and_self():
|
|
content = {'error': True, 'msg': _('Cannot merge with child object!')}
|
|
return Response(content, status=status.HTTP_403_FORBIDDEN)
|
|
isTree = True
|
|
except AttributeError:
|
|
# AttributeError probably means its not a tree, so can safely ignore
|
|
isTree = False
|
|
|
|
try:
|
|
if isinstance(source, Food):
|
|
source.properties.all().delete()
|
|
source.properties.clear()
|
|
UnitConversion.objects.filter(food=source).delete()
|
|
|
|
for link in [field for field in source._meta.get_fields() if issubclass(type(field), ForeignObjectRel)]:
|
|
linkManager = getattr(source, link.get_accessor_name())
|
|
related = linkManager.all()
|
|
# link to foreign relationship could be OneToMany or ManyToMany
|
|
if link.one_to_many:
|
|
for r in related:
|
|
setattr(r, link.field.name, target)
|
|
r.save()
|
|
elif link.many_to_many:
|
|
for r in related:
|
|
getattr(r, link.field.name).add(target)
|
|
getattr(r, link.field.name).remove(source)
|
|
r.save()
|
|
else:
|
|
# a new scenario exists and needs to be handled
|
|
raise NotImplementedError
|
|
if isTree:
|
|
if self.model.node_order_by:
|
|
node_location = 'sorted-child'
|
|
else:
|
|
node_location = 'last-child'
|
|
|
|
children = source.get_children().exclude(id=target.id)
|
|
for c in children:
|
|
c.move(target, node_location)
|
|
content = {'msg': _(f'{source.name} was merged successfully with {target.name}')}
|
|
source.delete()
|
|
return Response(content, status=status.HTTP_200_OK)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
content = {'error': True,
|
|
'msg': _(f'An error occurred attempting to merge {source.name} with {target.name}')}
|
|
return Response(content, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(
|
|
list=extend_schema(parameters=[
|
|
OpenApiParameter(name='root',
|
|
description='Return first level children of {obj} with ID [int]. Integer 0 will return root {obj}s.',
|
|
type=int),
|
|
OpenApiParameter(name='tree', description='Return all self and children of {obj} with ID [int].', type=int),
|
|
]),
|
|
move=extend_schema(parameters=[
|
|
OpenApiParameter(name="parent", description='The ID of the desired parent of the {obj}.', type=OpenApiTypes.INT,
|
|
location=OpenApiParameter.PATH)
|
|
])
|
|
)
|
|
class TreeMixin(MergeMixin, FuzzyFilterMixin):
|
|
model = None
|
|
|
|
def get_queryset(self):
|
|
root = self.request.query_params.get('root', None)
|
|
tree = self.request.query_params.get('tree', None)
|
|
|
|
if root:
|
|
if root.isnumeric():
|
|
try:
|
|
root = int(root)
|
|
except ValueError:
|
|
self.queryset = self.model.objects.none()
|
|
|
|
if root == 0:
|
|
self.queryset = self.model.get_root_nodes()
|
|
else:
|
|
self.queryset = self.model.objects.get(id=root).get_children()
|
|
elif tree:
|
|
if tree.isnumeric():
|
|
try:
|
|
self.queryset = self.model.objects.get(id=int(tree)).get_descendants_and_self()
|
|
except self.model.DoesNotExist:
|
|
self.queryset = self.model.objects.none()
|
|
else:
|
|
return self.annotate_recipe(queryset=super().get_queryset(), request=self.request,
|
|
serializer=self.serializer_class, tree=True)
|
|
self.queryset = self.queryset.filter(space=self.request.space).order_by(Lower('name').asc())
|
|
|
|
return self.annotate_recipe(queryset=self.queryset, request=self.request, serializer=self.serializer_class,
|
|
tree=True)
|
|
|
|
@decorators.action(detail=True, url_path='move/(?P<parent>[^/.]+)', methods=['PUT'], )
|
|
@decorators.renderer_classes((TemplateHTMLRenderer, JSONRenderer))
|
|
def move(self, request, pk, parent: int):
|
|
self.description = f"Move {self.basename} to be a child of {self.basename} with ID of [int]. Use ID: 0 to move {self.basename} to the root."
|
|
if self.model.node_order_by:
|
|
node_location = 'sorted'
|
|
else:
|
|
node_location = 'last'
|
|
|
|
try:
|
|
child = self.model.objects.get(pk=pk, space=self.request.space)
|
|
except (self.model.DoesNotExist):
|
|
content = {'error': True, 'msg': _(f'No {self.basename} with id {pk} exists')}
|
|
return Response(content, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
parent = int(parent)
|
|
# parent 0 is root of the tree
|
|
if parent == 0:
|
|
try:
|
|
with scopes_disabled():
|
|
child.move(self.model.get_first_root_node(), f'{node_location}-sibling')
|
|
content = {'msg': _(f'{child.name} was moved successfully to the root.')}
|
|
return Response(content, status=status.HTTP_200_OK)
|
|
except (PathOverflow, InvalidMoveToDescendant, InvalidPosition):
|
|
content = {'error': True, 'msg': _('An error occurred attempting to move ') + child.name}
|
|
return Response(content, status=status.HTTP_400_BAD_REQUEST)
|
|
elif parent == child.id:
|
|
content = {'error': True, 'msg': _('Cannot move an object to itself!')}
|
|
return Response(content, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
try:
|
|
parent = self.model.objects.get(pk=parent, space=self.request.space)
|
|
except (self.model.DoesNotExist):
|
|
content = {'error': True, 'msg': _(f'No {self.basename} with id {parent} exists')}
|
|
return Response(content, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
try:
|
|
with scopes_disabled():
|
|
child.move(parent, f'{node_location}-child')
|
|
content = {'msg': _(f'{child.name} was moved successfully to parent {parent.name}')}
|
|
return Response(content, status=status.HTTP_200_OK)
|
|
except (PathOverflow, InvalidMoveToDescendant, InvalidPosition):
|
|
content = {'error': True, 'msg': _('An error occurred attempting to move ') + child.name}
|
|
return Response(content, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='filter_list', description='User IDs, repeat for multiple', type=str, many=True),
|
|
]))
|
|
class UserViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = User.objects
|
|
serializer_class = UserSerializer
|
|
permission_classes = [CustomUserPermission & CustomTokenHasReadWriteScope]
|
|
pagination_disabled = True
|
|
http_method_names = ['get', 'patch']
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset.filter(userspace__space=self.request.space)
|
|
try:
|
|
filter_list = self.request.query_params.get('filter_list', None)
|
|
if filter_list is not None:
|
|
queryset = queryset.filter(pk__in=json.loads(filter_list))
|
|
except ValueError:
|
|
raise APIException('Parameter filter_list incorrectly formatted')
|
|
|
|
return queryset
|
|
|
|
|
|
class GroupViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = Group.objects.all()
|
|
serializer_class = GroupSerializer
|
|
permission_classes = [CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
pagination_disabled = True
|
|
http_method_names = ['get', ]
|
|
|
|
|
|
class SpaceViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = Space.objects
|
|
serializer_class = SpaceSerializer
|
|
permission_classes = [IsReadOnlyDRF & CustomIsGuest | CustomIsOwner & CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
http_method_names = ['get', 'patch']
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(
|
|
id__in=UserSpace.objects.filter(user=self.request.user).values_list('space_id', flat=True))
|
|
|
|
@extend_schema(responses=SpaceSerializer(many=False))
|
|
@decorators.action(detail=False, pagination_class=None, methods=['GET'], serializer_class=SpaceSerializer, )
|
|
def current(self, request):
|
|
self.queryset.filter(id=self.request.space.id)
|
|
return Response(self.serializer_class(self.request.space, many=False, context={'request': self.request}).data)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='internal_note', description='text field to store information about the invite link', type=str),
|
|
]))
|
|
class UserSpaceViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = UserSpace.objects
|
|
serializer_class = UserSpaceSerializer
|
|
permission_classes = [(CustomIsSpaceOwner | CustomIsOwnerReadOnly) & CustomTokenHasReadWriteScope]
|
|
http_method_names = ['get', 'put', 'patch', 'delete']
|
|
pagination_class = DefaultPagination
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
if request.space.created_by == UserSpace.objects.get(pk=kwargs['pk']).user:
|
|
raise APIException('Cannot delete Space owner permission.')
|
|
return super().destroy(request, *args, **kwargs)
|
|
|
|
def get_queryset(self):
|
|
internal_note = self.request.query_params.get('internal_note', None)
|
|
if internal_note is not None:
|
|
self.queryset = self.queryset.filter(internal_note=internal_note)
|
|
|
|
if is_space_owner(self.request.user, self.request.space):
|
|
return self.queryset.filter(space=self.request.space)
|
|
else:
|
|
return self.queryset.filter(user=self.request.user, space=self.request.space)
|
|
|
|
|
|
class UserPreferenceViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = UserPreference.objects
|
|
serializer_class = UserPreferenceSerializer
|
|
permission_classes = [CustomIsOwner & CustomTokenHasReadWriteScope]
|
|
pagination_disabled = True
|
|
http_method_names = ['get', 'patch', ]
|
|
|
|
def get_queryset(self):
|
|
with scopes_disabled(): # need to disable scopes as user preference is no longer a spaced method
|
|
return self.queryset.filter(user=self.request.user)
|
|
|
|
|
|
class StorageViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
# TODO handle delete protect error and adjust test
|
|
queryset = Storage.objects
|
|
serializer_class = StorageSerializer
|
|
permission_classes = [CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
pagination_disabled = True
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
class ConnectorConfigConfigViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = ConnectorConfig.objects
|
|
serializer_class = ConnectorConfigConfigSerializer
|
|
permission_classes = [CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
pagination_disabled = True
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
class SyncViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = Sync.objects
|
|
serializer_class = SyncSerializer
|
|
permission_classes = [CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
class SyncLogViewSet(LoggingMixin, viewsets.ReadOnlyModelViewSet):
|
|
queryset = SyncLog.objects
|
|
serializer_class = SyncLogSerializer
|
|
permission_classes = [CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(sync__space=self.request.space)
|
|
|
|
|
|
class SupermarketViewSet(LoggingMixin, StandardFilterModelViewSet):
|
|
queryset = Supermarket.objects
|
|
serializer_class = SupermarketSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
self.queryset = self.queryset.filter(space=self.request.space)
|
|
return super().get_queryset()
|
|
|
|
|
|
# TODO does supermarket category have settings to support fuzzy filtering and/or merge?
|
|
class SupermarketCategoryViewSet(LoggingMixin, FuzzyFilterMixin, MergeMixin):
|
|
queryset = SupermarketCategory.objects
|
|
model = SupermarketCategory
|
|
serializer_class = SupermarketCategorySerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
self.queryset = self.queryset.filter(space=self.request.space).order_by(Lower('name').asc())
|
|
return super().get_queryset()
|
|
|
|
|
|
class SupermarketCategoryRelationViewSet(LoggingMixin, StandardFilterModelViewSet):
|
|
queryset = SupermarketCategoryRelation.objects
|
|
serializer_class = SupermarketCategoryRelationSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
self.queryset = self.queryset.filter(supermarket__space=self.request.space).order_by('order')
|
|
return super().get_queryset()
|
|
|
|
|
|
class KeywordViewSet(LoggingMixin, TreeMixin):
|
|
queryset = Keyword.objects
|
|
model = Keyword
|
|
serializer_class = KeywordSerializer
|
|
permission_classes = [(CustomIsGuest & IsReadOnlyDRF | CustomIsUser) & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
|
|
class UnitViewSet(LoggingMixin, MergeMixin, FuzzyFilterMixin):
|
|
queryset = Unit.objects
|
|
model = Unit
|
|
serializer_class = UnitSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
|
|
class FoodInheritFieldViewSet(LoggingMixin, viewsets.ReadOnlyModelViewSet):
|
|
queryset = FoodInheritField.objects
|
|
serializer_class = FoodInheritFieldSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_disabled = True
|
|
|
|
def get_queryset(self):
|
|
# exclude fields not yet implemented
|
|
self.queryset = Food.inheritable_fields
|
|
return super().get_queryset()
|
|
|
|
|
|
class FoodViewSet(LoggingMixin, TreeMixin):
|
|
queryset = Food.objects
|
|
model = Food
|
|
serializer_class = FoodSerializer
|
|
permission_classes = [(CustomIsGuest & IsReadOnlyDRF | CustomIsUser) & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
shared_users = []
|
|
if c := caches['default'].get(f'shopping_shared_users_{self.request.space.id}_{self.request.user.id}', None):
|
|
shared_users = c
|
|
else:
|
|
try:
|
|
shared_users = [x.id for x in list(self.request.user.get_shopping_share())] + [self.request.user.id]
|
|
caches['default'].set(f'shopping_shared_users_{self.request.space.id}_{self.request.user.id}',
|
|
shared_users, timeout=5 * 60)
|
|
# TODO ugly hack that improves API performance significantly, should be done properly
|
|
except AttributeError: # Anonymous users (using share links) don't have shared users
|
|
pass
|
|
|
|
self.queryset = super().get_queryset()
|
|
shopping_status = ShoppingListEntry.objects.filter(space=self.request.space, food=OuterRef('id'),
|
|
checked=False).values('id')
|
|
# onhand_status = self.queryset.annotate(onhand_status=Exists(onhand_users_set__in=[shared_users]))
|
|
return self.queryset \
|
|
.annotate(shopping_status=Exists(shopping_status)) \
|
|
.prefetch_related('onhand_users', 'inherit_fields', 'child_inherit_fields', 'substitute') \
|
|
.select_related('recipe', 'supermarket_category')
|
|
|
|
def get_serializer_class(self):
|
|
if self.request and self.request.query_params.get('simple', False):
|
|
return FoodSimpleSerializer
|
|
return self.serializer_class
|
|
|
|
# TODO I could not find any usage of this and it causes schema generation issues, so commenting it for now
|
|
# this is used on the Shopping Badge
|
|
@decorators.action(detail=True, methods=['PUT'], serializer_class=FoodShoppingUpdateSerializer, )
|
|
# # TODO DRF only allows one action in a decorator action without overriding get_operation_id_base() this should be PUT and DELETE probably
|
|
def shopping(self, request, pk):
|
|
if self.request.space.demo:
|
|
raise PermissionDenied(detail='Not available in demo', code=None)
|
|
obj = self.get_object()
|
|
shared_users = list(self.request.user.get_shopping_share())
|
|
shared_users.append(request.user)
|
|
if request.data.get('_delete', False) == 'true':
|
|
ShoppingListEntry.objects.filter(food=obj, checked=False, space=request.space,
|
|
created_by__in=shared_users).delete()
|
|
content = {'msg': _(f'{obj.name} was removed from the shopping list.')}
|
|
return Response(content, status=status.HTTP_204_NO_CONTENT)
|
|
|
|
amount = request.data.get('amount', 1)
|
|
unit = request.data.get('unit', None)
|
|
content = {'msg': _(f'{obj.name} was added to the shopping list.')}
|
|
|
|
ShoppingListEntry.objects.create(food=obj, amount=amount, unit=unit, space=request.space,
|
|
created_by=request.user)
|
|
return Response(content, status=status.HTTP_204_NO_CONTENT)
|
|
|
|
@decorators.action(detail=True, methods=['POST'], )
|
|
def fdc(self, request, pk):
|
|
"""
|
|
updates the food with all possible data from the FDC Api
|
|
if properties with a fdc_id already exist they will be overridden, if existing properties don't have a fdc_id they won't be changed
|
|
"""
|
|
food = self.get_object()
|
|
|
|
if request.data['fdc_id']:
|
|
food.fdc_id = request.data['fdc_id']
|
|
|
|
if not food.fdc_id:
|
|
return JsonResponse({'msg': 'Food has no FDC ID associated.'}, status=400, json_dumps_params={'indent': 4})
|
|
|
|
response = requests.get(f'https://api.nal.usda.gov/fdc/v1/food/{food.fdc_id}?api_key={FDC_API_KEY}')
|
|
if response.status_code == 429:
|
|
return JsonResponse(
|
|
{
|
|
'msg':
|
|
'API Key Rate Limit reached/exceeded, see https://api.data.gov/docs/rate-limits/ for more information. \
|
|
Configure your key in Tandoor using environment FDC_API_KEY variable.'
|
|
},
|
|
status=429,
|
|
json_dumps_params={'indent': 4})
|
|
if response.status_code != 200:
|
|
return JsonResponse({
|
|
'msg': f'Error while requesting FDC data using url https://api.nal.usda.gov/fdc/v1/food/{food.fdc_id}?api_key=****'},
|
|
status=response.status_code,
|
|
json_dumps_params={'indent': 4})
|
|
|
|
food.properties_food_amount = 100
|
|
|
|
standard_unit = Unit.objects.filter(base_unit__iexact='g', space=self.request.space).first()
|
|
if not standard_unit:
|
|
standard_unit = Unit.objects.filter(name__iexact='g', space=self.request.space).first()
|
|
if not standard_unit:
|
|
standard_unit = Unit.objects.create(name='g', base_unit='g', space=self.request.space)
|
|
else:
|
|
standard_unit.base_unit = 'g'
|
|
standard_unit.save()
|
|
|
|
food.properties_food_unit = standard_unit
|
|
food.save()
|
|
|
|
try:
|
|
data = json.loads(response.content)
|
|
|
|
food_property_list = []
|
|
|
|
# delete all properties where the property type has a fdc_id as these should be overridden
|
|
for fp in food.properties.all():
|
|
if fp.property_type.fdc_id:
|
|
fp.delete()
|
|
|
|
for pt in PropertyType.objects.filter(space=request.space, fdc_id__gte=0).all():
|
|
if pt.fdc_id:
|
|
property_found = False
|
|
for fn in data['foodNutrients']:
|
|
if fn['nutrient']['id'] == pt.fdc_id:
|
|
property_found = True
|
|
food_property_list.append(
|
|
Property(property_type_id=pt.id,
|
|
property_amount=max(0, round(fn['amount'], 2)),
|
|
# sometimes FDC might return negative values which make no sense, set to 0
|
|
space=self.request.space,
|
|
))
|
|
if not property_found:
|
|
food_property_list.append(
|
|
Property(property_type_id=pt.id, property_amount=0,
|
|
# if field not in FDC data the food does not have that property
|
|
space=self.request.space,
|
|
))
|
|
|
|
properties = Property.objects.bulk_create(food_property_list, unique_fields=('space', 'property_type',))
|
|
|
|
property_food_relation_list = []
|
|
for p in properties:
|
|
property_food_relation_list.append(Food.properties.through(food_id=food.id, property_id=p.pk))
|
|
|
|
FoodProperty.objects.bulk_create(property_food_relation_list, ignore_conflicts=True,
|
|
unique_fields=('food_id', 'property_id',))
|
|
|
|
return self.retrieve(request, pk)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return JsonResponse({'msg': 'there was an error parsing the FDC data, please check the server logs'},
|
|
status=500, json_dumps_params={'indent': 4})
|
|
|
|
def destroy(self, *args, **kwargs):
|
|
try:
|
|
return (super().destroy(self, *args, **kwargs))
|
|
except ProtectedError as e:
|
|
content = {'error': True, 'msg': e.args[0]}
|
|
return Response(content, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='order_field', description='Field to order recipe books on', type=str,
|
|
enum=['id', 'name', 'order']),
|
|
OpenApiParameter(name='order_direction', description='Order ascending or descending', type=str,
|
|
enum=['asc', 'desc']),
|
|
]))
|
|
class RecipeBookViewSet(LoggingMixin, StandardFilterModelViewSet):
|
|
queryset = RecipeBook.objects
|
|
serializer_class = RecipeBookSerializer
|
|
permission_classes = [(CustomIsOwner | CustomIsShared) & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
order_field = self.request.GET.get('order_field')
|
|
order_direction = self.request.GET.get('order_direction')
|
|
|
|
if not order_field:
|
|
order_field = 'id'
|
|
|
|
ordering = f"{'' if order_direction == 'asc' else '-'}{order_field}"
|
|
|
|
self.queryset = self.queryset.filter(Q(created_by=self.request.user) | Q(shared=self.request.user)).filter(
|
|
space=self.request.space).distinct().order_by(ordering)
|
|
return super().get_queryset()
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='recipe', description='id of recipe - only return books for that recipe', type=int),
|
|
OpenApiParameter(name='book', description='id of book - only return recipes in that book', type=int),
|
|
]))
|
|
class RecipeBookEntryViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = RecipeBookEntry.objects
|
|
serializer_class = RecipeBookEntrySerializer
|
|
permission_classes = [(CustomIsOwner | CustomIsShared) & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset.filter(
|
|
Q(book__created_by=self.request.user) | Q(book__shared=self.request.user)).filter(
|
|
book__space=self.request.space).distinct()
|
|
|
|
recipe_id = self.request.query_params.get('recipe', None)
|
|
if recipe_id is not None:
|
|
queryset = queryset.filter(recipe__pk=recipe_id)
|
|
|
|
book_id = self.request.query_params.get('book', None)
|
|
if book_id is not None:
|
|
queryset = queryset.filter(book__pk=book_id)
|
|
return queryset
|
|
|
|
|
|
MealPlanViewQueryParameters = [
|
|
OpenApiParameter(name='from_date', description=_('Filter meal plans from date (inclusive).'), type=str,
|
|
examples=[DateExample]),
|
|
OpenApiParameter(name='to_date', description=_('Filter meal plans to date (inclusive).'), type=str,
|
|
examples=[DateExample]),
|
|
OpenApiParameter(name='meal_type',
|
|
description=_('Filter meal plans with MealType ID. For multiple repeat parameter.'), type=str,
|
|
many=True),
|
|
]
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=MealPlanViewQueryParameters),
|
|
ical=extend_schema(parameters=MealPlanViewQueryParameters,
|
|
responses={(200, 'text/calendar'): OpenApiTypes.STR}))
|
|
class MealPlanViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = MealPlan.objects
|
|
serializer_class = MealPlanSerializer
|
|
permission_classes = [(CustomIsOwner | CustomIsShared) & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset.filter(Q(created_by=self.request.user) | Q(shared=self.request.user)).filter(
|
|
space=self.request.space).distinct().all()
|
|
|
|
from_date = self.request.query_params.get('from_date', None)
|
|
if from_date is not None:
|
|
queryset = queryset.filter(to_date__date__gte=from_date)
|
|
|
|
to_date = self.request.query_params.get('to_date', None)
|
|
if to_date is not None:
|
|
queryset = queryset.filter(to_date__date__lte=to_date)
|
|
|
|
meal_type = self.request.query_params.getlist('meal_type', [])
|
|
if meal_type:
|
|
queryset = queryset.filter(meal_type__in=meal_type)
|
|
|
|
return queryset
|
|
|
|
@decorators.action(detail=False)
|
|
def ical(self, request):
|
|
from_date = self.request.query_params.get('from_date', None)
|
|
to_date = self.request.query_params.get('to_date', None)
|
|
return meal_plans_to_ical(self.get_queryset(), f'meal_plan_{from_date}-{to_date}.ics')
|
|
|
|
|
|
class AutoPlanViewSet(LoggingMixin, mixins.CreateModelMixin, viewsets.GenericViewSet):
|
|
serializer_class = AutoMealPlanSerializer
|
|
http_method_names = ['post', 'options']
|
|
|
|
def create(self, request):
|
|
serializer = AutoMealPlanSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
keyword_ids = serializer.validated_data['keyword_ids']
|
|
start_date = serializer.validated_data['start_date']
|
|
end_date = serializer.validated_data['end_date']
|
|
servings = serializer.validated_data['servings']
|
|
shared = serializer.get_initial().get('shared', None)
|
|
shared_pks = list()
|
|
if shared is not None:
|
|
for i in range(len(shared)):
|
|
shared_pks.append(shared[i]['id'])
|
|
|
|
days = min((end_date - start_date).days + 1, 14)
|
|
|
|
recipes = Recipe.objects.values('id', 'name')
|
|
meal_plans = list()
|
|
|
|
for keyword_id in keyword_ids:
|
|
recipes = recipes.filter(keywords__id=keyword_id)
|
|
|
|
if len(recipes) == 0:
|
|
return Response(serializer.data)
|
|
recipes = list(recipes.order_by('?')[:days])
|
|
|
|
for i in range(0, days):
|
|
day = start_date + datetime.timedelta(i)
|
|
recipe = recipes[i % len(recipes)]
|
|
args = {
|
|
'recipe_id': recipe['id'],
|
|
'servings': servings,
|
|
'created_by': request.user,
|
|
'meal_type_id': serializer.validated_data['meal_type_id'],
|
|
'note': '',
|
|
'from_date': day,
|
|
'to_date': day,
|
|
'space': request.space
|
|
}
|
|
|
|
m = MealPlan(**args)
|
|
meal_plans.append(m)
|
|
|
|
MealPlan.objects.bulk_create(meal_plans)
|
|
|
|
for m in meal_plans:
|
|
m.shared.set(shared_pks)
|
|
|
|
if request.data.get('addshopping', False):
|
|
SLR = RecipeShoppingEditor(user=request.user, space=request.space)
|
|
SLR.create(mealplan=m, servings=servings)
|
|
|
|
else:
|
|
post_save.send(sender=m.__class__, instance=m, created=True, update_fields=None, )
|
|
|
|
return Response(serializer.data)
|
|
|
|
return Response(serializer.errors, 400)
|
|
|
|
|
|
class MealTypeViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
"""
|
|
returns list of meal types created by the
|
|
requesting user ordered by the order field.
|
|
"""
|
|
queryset = MealType.objects
|
|
serializer_class = MealTypeSerializer
|
|
permission_classes = [CustomIsOwner & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset.order_by('time', 'id').filter(created_by=self.request.user).filter(
|
|
space=self.request.space).all()
|
|
return queryset
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='food', description='ID of food to filter for', type=int),
|
|
OpenApiParameter(name='unit', description='ID of unit to filter for', type=int),
|
|
]))
|
|
class IngredientViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = Ingredient.objects
|
|
serializer_class = IngredientSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_serializer_class(self):
|
|
if self.request and self.request.query_params.get('simple', False):
|
|
return IngredientSimpleSerializer
|
|
return self.serializer_class
|
|
|
|
def get_queryset(self):
|
|
queryset = self.queryset.filter(step__recipe__space=self.request.space)
|
|
food = self.request.query_params.get('food', None)
|
|
if food and re.match(r'^(\d)+$', food):
|
|
queryset = queryset.filter(food_id=food)
|
|
|
|
unit = self.request.query_params.get('unit', None)
|
|
if unit and re.match(r'^(\d)+$', unit):
|
|
queryset = queryset.filter(unit_id=unit)
|
|
|
|
return queryset.select_related('food')
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='recipe', description=_('ID of recipe a step is part of. For multiple repeat parameter.'),
|
|
type=int, many=True),
|
|
OpenApiParameter(name='query', description=_('Query string matched (fuzzy) against object name.'), type=str),
|
|
]))
|
|
class StepViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = Step.objects
|
|
serializer_class = StepSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
recipes = self.request.query_params.getlist('recipe', [])
|
|
query = self.request.query_params.get('query', None)
|
|
if len(recipes) > 0:
|
|
self.queryset = self.queryset.filter(recipe__in=recipes)
|
|
if query is not None:
|
|
self.queryset = self.queryset.filter(Q(name__icontains=query) | Q(recipe__name__icontains=query))
|
|
return self.queryset.filter(recipe__space=self.request.space)
|
|
|
|
|
|
class RecipePagination(PageNumberPagination):
|
|
page_size = 25
|
|
page_size_query_param = 'page_size'
|
|
max_page_size = 100
|
|
|
|
def paginate_queryset(self, queryset, request, view=None):
|
|
if queryset is None:
|
|
raise Exception
|
|
return super().paginate_queryset(queryset, request, view)
|
|
|
|
def get_paginated_response(self, data):
|
|
return Response(OrderedDict([('count', self.page.paginator.count), ('next', self.get_next_link()),
|
|
('previous', self.get_previous_link()), ('results', data), ]))
|
|
|
|
|
|
@extend_schema_view(retrieve=extend_schema(parameters=[
|
|
OpenApiParameter(name='share', type=str),
|
|
]), list=extend_schema(parameters=[
|
|
OpenApiParameter(name='query', description=_('Query string matched (fuzzy) against recipe name. In the future also fulltext search.'), type=str),
|
|
|
|
OpenApiParameter(name='keywords', description=_('ID of keyword a recipe should have. For multiple repeat parameter. Equivalent to keywords_or'), type=int, many=True),
|
|
OpenApiParameter(name='keywords_or', description=_('Keyword IDs, repeat for multiple. Return recipes with any of the keywords'), type=int, many=True),
|
|
OpenApiParameter(name='keywords_and', description=_('Keyword IDs, repeat for multiple. Return recipes with all of the keywords.'), type=int, many=True),
|
|
OpenApiParameter(name='keywords_or_not', description=_('Keyword IDs, repeat for multiple. Exclude recipes with any of the keywords.'), type=int, many=True),
|
|
OpenApiParameter(name='keywords_and_not', description=_('Keyword IDs, repeat for multiple. Exclude recipes with all of the keywords.'), type=int, many=True),
|
|
|
|
OpenApiParameter(name='foods', description=_('ID of food a recipe should have. For multiple repeat parameter.'), type=int, many=True),
|
|
OpenApiParameter(name='foods_or', description=_('Food IDs, repeat for multiple. Return recipes with any of the foods'), type=int, many=True),
|
|
OpenApiParameter(name='foods_and', description=_('Food IDs, repeat for multiple. Return recipes with all of the foods.'), type=int, many=True),
|
|
OpenApiParameter(name='foods_or_not', description=_('Food IDs, repeat for multiple. Exclude recipes with any of the foods.'), type=int, many=True),
|
|
OpenApiParameter(name='foods_and_not', description=_('Food IDs, repeat for multiple. Exclude recipes with all of the foods.'), type=int, many=True),
|
|
|
|
OpenApiParameter(name='books', description=_('ID of book a recipe should be in. For multiple repeat parameter.'), type=int, many=True),
|
|
OpenApiParameter(name='books_or', description=_('Book IDs, repeat for multiple. Return recipes with any of the books'), type=int, many=True),
|
|
OpenApiParameter(name='books_and', description=_('Book IDs, repeat for multiple. Return recipes with all of the books.'), type=int, many=True),
|
|
OpenApiParameter(name='books_or_not', description=_('Book IDs, repeat for multiple. Exclude recipes with any of the books.'), type=int, many=True),
|
|
OpenApiParameter(name='books_and_not', description=_('Book IDs, repeat for multiple. Exclude recipes with all of the books.'), type=int, many=True),
|
|
|
|
OpenApiParameter(name='units', description=_('ID of unit a recipe should have.'), type=int),
|
|
|
|
OpenApiParameter(name='rating', description=_('Exact rating of recipe'), type=int),
|
|
OpenApiParameter(name='rating_gte', description=_('Rating a recipe should have or greater.'), type=int),
|
|
OpenApiParameter(name='rating_lte', description=_('Rating a recipe should have or smaller.'), type=int),
|
|
|
|
OpenApiParameter(name='timescooked', description=_('Filter recipes cooked X times.'), type=int),
|
|
OpenApiParameter(name='timescooked_gte', description=_('Filter recipes cooked X times or more.'), type=int),
|
|
OpenApiParameter(name='timescooked_lte', description=_('Filter recipes cooked X times or less.'), type=int),
|
|
|
|
OpenApiParameter(name='createdon', description=_('Filter recipes created on the given date.'), type=OpenApiTypes.DATE, ),
|
|
OpenApiParameter(name='createdon_gte', description=_('Filter recipes created on the given date or after.'), type=OpenApiTypes.DATE, ),
|
|
OpenApiParameter(name='createdon_lte', description=_('Filter recipes created on the given date or before.'), type=OpenApiTypes.DATE, ),
|
|
|
|
OpenApiParameter(name='updatedon', description=_('Filter recipes updated on the given date.'), type=OpenApiTypes.DATE, ),
|
|
OpenApiParameter(name='updatedon_gte', description=_('Filter recipes updated on the given date.'), type=OpenApiTypes.DATE, ),
|
|
OpenApiParameter(name='updatedon_lte', description=_('Filter recipes updated on the given date.'), type=OpenApiTypes.DATE, ),
|
|
|
|
OpenApiParameter(name='cookedon_gte', description=_('Filter recipes last cooked on the given date or after.'), type=OpenApiTypes.DATE),
|
|
OpenApiParameter(name='cookedon_lte', description=_('Filter recipes last cooked on the given date or before.'), type=OpenApiTypes.DATE),
|
|
|
|
OpenApiParameter(name='viewedon_gte', description=_('Filter recipes lasts viewed on the given date.'), type=OpenApiTypes.DATE, ),
|
|
OpenApiParameter(name='viewedon_lte', description=_('Filter recipes lasts viewed on the given date.'), type=OpenApiTypes.DATE, ),
|
|
|
|
OpenApiParameter(name='createdby', description=_('Filter recipes for ones created by the given user ID'), type=int),
|
|
OpenApiParameter(name='internal', description=_('If only internal recipes should be returned. [''true''/''<b>false</b>'']'), type=bool),
|
|
OpenApiParameter(name='random', description=_('Returns the results in randomized order. [''true''/''<b>false</b>'']'), type=bool),
|
|
OpenApiParameter(name='new', description=_('Returns new results first in search results. [''true''/''<b>false</b>'']'), type=bool),
|
|
OpenApiParameter(name='num_recent', description=_('Returns the given number of recently viewed recipes before search results (if given)'), type=int),
|
|
OpenApiParameter(name='makenow', description=_('Filter recipes that can be made with OnHand food. [''true''/''<b>false</b>'']'), type=bool),
|
|
]))
|
|
class RecipeViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = Recipe.objects
|
|
serializer_class = RecipeSerializer
|
|
# TODO split read and write permission for meal plan guest
|
|
permission_classes = [CustomRecipePermission & CustomTokenHasReadWriteScope]
|
|
pagination_class = RecipePagination
|
|
|
|
def get_queryset(self):
|
|
share = self.request.GET.get('share', None)
|
|
|
|
if self.detail: # if detail request and not list, private condition is verified by permission class
|
|
if not share: # filter for space only if not shared
|
|
self.queryset = self.queryset.filter(
|
|
space=self.request.space).prefetch_related('keywords', 'shared', 'properties',
|
|
'properties__property_type', 'steps',
|
|
'steps__ingredients',
|
|
'steps__ingredients__step_set',
|
|
'steps__ingredients__step_set__recipe_set',
|
|
'steps__ingredients__food',
|
|
'steps__ingredients__food__properties',
|
|
'steps__ingredients__food__properties__property_type',
|
|
'steps__ingredients__food__inherit_fields',
|
|
'steps__ingredients__food__supermarket_category',
|
|
'steps__ingredients__food__onhand_users',
|
|
'steps__ingredients__food__substitute',
|
|
'steps__ingredients__food__child_inherit_fields',
|
|
'steps__ingredients__unit',
|
|
'steps__ingredients__unit__unit_conversion_base_relation',
|
|
'steps__ingredients__unit__unit_conversion_base_relation__base_unit',
|
|
'steps__ingredients__unit__unit_conversion_converted_relation',
|
|
'steps__ingredients__unit__unit_conversion_converted_relation__converted_unit',
|
|
'cooklog_set',
|
|
).select_related('nutrition')
|
|
|
|
return super().get_queryset()
|
|
|
|
self.queryset = self.queryset.filter(
|
|
space=self.request.space).filter(
|
|
Q(private=False) | (Q(private=True) & (Q(created_by=self.request.user) | Q(shared=self.request.user))))
|
|
|
|
params = {x: self.request.GET.get(x) if len({**self.request.GET}[x]) == 1 else self.request.GET.getlist(x) for x
|
|
in list(self.request.GET)}
|
|
search = RecipeSearch(self.request, **params)
|
|
self.queryset = search.get_queryset(self.queryset).prefetch_related('keywords', 'cooklog_set')
|
|
return self.queryset
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
if self.request.GET.get('debug', False):
|
|
return JsonResponse({'new': str(self.get_queryset().query), })
|
|
return super().list(request, *args, **kwargs)
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'list':
|
|
return RecipeOverviewSerializer
|
|
return self.serializer_class
|
|
|
|
@decorators.action(detail=True, methods=['PUT'], serializer_class=RecipeImageSerializer,
|
|
parser_classes=[MultiPartParser], )
|
|
def image(self, request, pk):
|
|
obj = self.get_object()
|
|
|
|
if obj.get_space() != request.space:
|
|
raise PermissionDenied(detail='You do not have the required permission to perform this action', code=403)
|
|
|
|
serializer = self.serializer_class(obj, data=request.data, partial=True)
|
|
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
image = None
|
|
filetype = ".jpeg" # fall-back to .jpeg, even if wrong, at least users will know it's an image and most image viewers can open it correctly anyways
|
|
|
|
if 'image' in serializer.validated_data:
|
|
image = obj.image
|
|
filetype = mimetypes.guess_extension(serializer.validated_data['image'].content_type) or filetype
|
|
elif 'image_url' in serializer.validated_data:
|
|
try:
|
|
url = serializer.validated_data['image_url']
|
|
if validate_import_url(url):
|
|
response = requests.get(url, headers={
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0"})
|
|
image = File(io.BytesIO(response.content))
|
|
filetype = mimetypes.guess_extension(response.headers['content-type']) or filetype
|
|
except UnidentifiedImageError as e:
|
|
print(e)
|
|
pass
|
|
except MissingSchema as e:
|
|
print(e)
|
|
pass
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|
|
|
|
if image is not None:
|
|
img = handle_image(request, image, filetype)
|
|
obj.image.save(f'{uuid.uuid4()}_{obj.pk}{filetype}', img)
|
|
obj.save()
|
|
return Response(serializer.data)
|
|
else:
|
|
obj.image = None
|
|
obj.save()
|
|
return Response(serializer.data)
|
|
|
|
return Response(serializer.errors, 400)
|
|
|
|
# TODO: refactor API to use post/put/delete or leave as put and change VUE to use list_recipe after creating
|
|
# DRF only allows one action in a decorator action without overriding get_operation_id_base()
|
|
@decorators.action(detail=True, methods=['PUT'], serializer_class=RecipeShoppingUpdateSerializer, )
|
|
def shopping(self, request, pk):
|
|
if self.request.space.demo:
|
|
raise PermissionDenied(detail='Not available in demo', code=None)
|
|
obj = self.get_object()
|
|
ingredients = request.data.get('ingredients', None)
|
|
|
|
servings = request.data.get('servings', None)
|
|
list_recipe = request.data.get('list_recipe', None)
|
|
mealplan = request.data.get('mealplan', None)
|
|
SLR = RecipeShoppingEditor(request.user, request.space, id=list_recipe, recipe=obj, mealplan=mealplan,
|
|
servings=servings)
|
|
|
|
content = {'msg': _(f'{obj.name} was added to the shopping list.')}
|
|
http_status = status.HTTP_204_NO_CONTENT
|
|
if servings and servings <= 0:
|
|
result = SLR.delete()
|
|
elif list_recipe:
|
|
result = SLR.edit(servings=servings, ingredients=ingredients)
|
|
else:
|
|
result = SLR.create(servings=servings, ingredients=ingredients)
|
|
|
|
if not result:
|
|
content = {'msg': ('An error occurred')}
|
|
http_status = status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
else:
|
|
content = {'msg': _(f'{obj.name} was added to the shopping list.')}
|
|
http_status = status.HTTP_204_NO_CONTENT
|
|
|
|
return Response(content, status=http_status)
|
|
|
|
@extend_schema(responses=RecipeSimpleSerializer(many=True))
|
|
@decorators.action(detail=True, pagination_class=None, methods=['GET'], serializer_class=RecipeSimpleSerializer)
|
|
def related(self, request, pk):
|
|
obj = self.get_object()
|
|
if obj.get_space() != request.space:
|
|
raise PermissionDenied(detail='You do not have the required permission to perform this action', code=403)
|
|
try:
|
|
levels = int(request.query_params.get('levels', 1))
|
|
except (ValueError, TypeError):
|
|
levels = 1
|
|
qs = obj.get_related_recipes(
|
|
levels=levels) # TODO: make levels a user setting, included in request data?, keep solely in the backend?
|
|
return Response(self.serializer_class(qs, many=True).data)
|
|
|
|
@extend_schema(responses=RecipeFlatSerializer(many=True))
|
|
@decorators.action(detail=False, pagination_class=None, methods=['GET'], serializer_class=RecipeFlatSerializer, )
|
|
def flat(self, request):
|
|
# TODO limit fields retrieved but .values() kills image
|
|
qs = Recipe.objects.filter(space=request.space).filter(Q(private=False) | (
|
|
Q(private=True) & (Q(created_by=self.request.user) | Q(shared=self.request.user)))).all()
|
|
|
|
return Response(self.serializer_class(qs, many=True).data)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(
|
|
parameters=[OpenApiParameter(name='food_id', description='ID of food to filter for', type=int), ]))
|
|
class UnitConversionViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = UnitConversion.objects
|
|
serializer_class = UnitConversionSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
food_id = self.request.query_params.get('food_id', None)
|
|
if food_id is not None:
|
|
self.queryset = self.queryset.filter(food_id=food_id)
|
|
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(
|
|
parameters=[OpenApiParameter(
|
|
name='category',
|
|
description=_('Return the PropertyTypes matching the property category. Repeat for multiple.'),
|
|
type=str,
|
|
many=True,
|
|
enum=[m[0] for m in PropertyType.CHOICES])
|
|
]
|
|
))
|
|
class PropertyTypeViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = PropertyType.objects
|
|
serializer_class = PropertyTypeSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
# TODO add tests for filter
|
|
category = self.request.query_params.getlist('category', [])
|
|
if category:
|
|
self.queryset.filter(category__in=category)
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
class PropertyViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = Property.objects
|
|
serializer_class = PropertySerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='mealplan', description=_('Returns only entries associated with the given mealplan id'), type=int)
|
|
]))
|
|
class ShoppingListRecipeViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = ShoppingListRecipe.objects
|
|
serializer_class = ShoppingListRecipeSerializer
|
|
permission_classes = [(CustomIsOwner | CustomIsShared) & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
self.queryset = self.queryset.filter(Q(entries__space=self.request.space) | Q(recipe__space=self.request.space) | Q(mealplan__space=self.request.space))
|
|
|
|
# TODO implement test for this
|
|
if not self.detail:
|
|
mealplan = self.request.query_params.get('mealplan', None)
|
|
|
|
if mealplan is not None:
|
|
self.queryset = self.queryset.filter(mealplan_id=mealplan)
|
|
|
|
return self.queryset.filter(Q(entries__isnull=True)
|
|
| Q(entries__created_by=self.request.user)
|
|
| Q(entries__created_by__in=list(self.request.user.get_shopping_share()))).distinct().all()
|
|
|
|
@decorators.action(detail=True, methods=['POST'], serializer_class=ShoppingListEntryBulkCreateSerializer, permission_classes=[CustomIsUser])
|
|
def bulk_create_entries(self, request, pk):
|
|
obj = self.get_object()
|
|
if obj.get_space() != request.space:
|
|
raise PermissionDenied(detail='You do not have the required permission to perform this action', code=403)
|
|
|
|
serializer = self.serializer_class(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
entries = []
|
|
for e in serializer.validated_data['entries']:
|
|
entries.append(
|
|
ShoppingListEntry(
|
|
list_recipe_id=obj.pk,
|
|
amount=e['amount'],
|
|
unit_id=e['unit_id'],
|
|
food_id=e['food_id'],
|
|
ingredient_id=e['ingredient_id'],
|
|
created_by_id=request.user.id,
|
|
space_id=request.space.id,
|
|
)
|
|
)
|
|
|
|
ShoppingListEntry.objects.bulk_create(entries)
|
|
return Response(serializer.validated_data)
|
|
else:
|
|
return Response(serializer.errors, 400)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='updated_after',
|
|
description=_('Returns only elements updated after the given timestamp in ISO 8601 format.'),
|
|
type=datetime.datetime),
|
|
OpenApiParameter(name='mealplan', description=_('Returns only entries associated with the given mealplan id'), type=int)
|
|
]))
|
|
class ShoppingListEntryViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
"""
|
|
individual entries of a shopping list
|
|
automatically filtered to only contain unchecked items that are not older than the shopping recent days setting to not bloat endpoint
|
|
"""
|
|
queryset = ShoppingListEntry.objects
|
|
serializer_class = ShoppingListEntrySerializer
|
|
permission_classes = [(CustomIsOwner | CustomIsShared) & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
self.queryset = self.queryset.filter(space=self.request.space)
|
|
|
|
self.queryset = self.queryset.filter(
|
|
Q(created_by=self.request.user)
|
|
| Q(created_by__in=list(self.request.user.get_shopping_share()))).prefetch_related('created_by', 'food',
|
|
'food__properties',
|
|
'food__properties__property_type',
|
|
'food__inherit_fields',
|
|
'food__supermarket_category',
|
|
'food__onhand_users',
|
|
'food__substitute',
|
|
'food__child_inherit_fields',
|
|
'unit', 'list_recipe',
|
|
'list_recipe__mealplan',
|
|
'list_recipe__mealplan__recipe',
|
|
).distinct().all()
|
|
|
|
updated_after = self.request.query_params.get('updated_after', None)
|
|
mealplan = self.request.query_params.get('mealplan', None)
|
|
|
|
if not self.detail:
|
|
# to keep the endpoint small, only return entries as old as user preference recent days
|
|
today_start = timezone.now().replace(hour=0, minute=0, second=0)
|
|
week_ago = today_start - datetime.timedelta(days=min(self.request.user.userpreference.shopping_recent_days, 14))
|
|
self.queryset = self.queryset.filter((Q(checked=False) | Q(completed_at__gte=week_ago)))
|
|
|
|
if mealplan is not None:
|
|
self.queryset = self.queryset.filter(list_recipe__mealplan_id=mealplan)
|
|
|
|
try:
|
|
if updated_after:
|
|
updated_after = parse_datetime(updated_after)
|
|
self.queryset = self.queryset.filter(updated_at__gte=updated_after)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
if self.detail:
|
|
return self.queryset
|
|
else:
|
|
return self.queryset[:1000]
|
|
|
|
@decorators.action(detail=False, methods=['POST'], serializer_class=ShoppingListEntryBulkSerializer,
|
|
permission_classes=[CustomIsUser])
|
|
def bulk(self, request):
|
|
serializer = self.serializer_class(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
bulk_entries = ShoppingListEntry.objects.filter(
|
|
Q(created_by=self.request.user) | Q(created_by__in=list(self.request.user.get_shopping_share()))
|
|
).filter(
|
|
space=request.space, id__in=serializer.validated_data['ids']
|
|
)
|
|
|
|
update_timestamp = timezone.now()
|
|
checked = serializer.validated_data['checked']
|
|
if checked:
|
|
bulk_entries.update(checked=checked, updated_at=update_timestamp, completed_at=update_timestamp)
|
|
else:
|
|
bulk_entries.update(checked=checked, updated_at=update_timestamp, completed_at=None)
|
|
serializer.validated_data['timestamp'] = update_timestamp
|
|
|
|
# update the onhand for food if shopping_add_onhand is True
|
|
if request.user.userpreference.shopping_add_onhand:
|
|
foods = Food.objects.filter(id__in=bulk_entries.values('food'))
|
|
if checked:
|
|
for f in foods:
|
|
f.onhand_users.add(*request.user.userpreference.shopping_share.all(), request.user)
|
|
elif checked == False:
|
|
for f in foods:
|
|
f.onhand_users.remove(*request.user.userpreference.shopping_share.all(), request.user)
|
|
|
|
return Response(serializer.validated_data)
|
|
else:
|
|
return Response(serializer.errors, 400)
|
|
|
|
|
|
class ViewLogViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = ViewLog.objects
|
|
serializer_class = ViewLogSerializer
|
|
permission_classes = [CustomIsOwner & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
# working backwards from the test - this is supposed to be limited to user view logs only??
|
|
return self.queryset.filter(created_by=self.request.user).filter(space=self.request.space)
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(
|
|
parameters=[OpenApiParameter(name='recipe', description='Filter for entries with the given recipe', type=int), ]))
|
|
class CookLogViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = CookLog.objects
|
|
serializer_class = CookLogSerializer
|
|
permission_classes = [CustomIsOwner & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
if self.request.query_params.get('recipe', None):
|
|
self.queryset = self.queryset.filter(recipe=self.request.query_params.get('recipe'))
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
class ImportLogViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = ImportLog.objects
|
|
serializer_class = ImportLogSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
class ExportLogViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = ExportLog.objects
|
|
serializer_class = ExportLogSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(space=self.request.space)
|
|
|
|
|
|
class BookmarkletImportViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = BookmarkletImport.objects
|
|
serializer_class = BookmarkletImportSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasScope]
|
|
pagination_class = DefaultPagination
|
|
required_scopes = ['bookmarklet']
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'list':
|
|
return BookmarkletImportListSerializer
|
|
return self.serializer_class
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(space=self.request.space).all()
|
|
|
|
|
|
class UserFileViewSet(LoggingMixin, StandardFilterModelViewSet):
|
|
queryset = UserFile.objects
|
|
serializer_class = UserFileSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
parser_classes = [MultiPartParser]
|
|
|
|
def get_queryset(self):
|
|
self.queryset = self.queryset.filter(space=self.request.space).all()
|
|
return super().get_queryset()
|
|
|
|
|
|
class AutomationViewSet(LoggingMixin, StandardFilterModelViewSet):
|
|
queryset = Automation.objects
|
|
serializer_class = AutomationSerializer
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
@extend_schema(
|
|
parameters=[OpenApiParameter(
|
|
name='type',
|
|
description=_('Return the Automations matching the automation type. Repeat for multiple.'),
|
|
type=str,
|
|
many=True,
|
|
enum=[a[0] for a in Automation.automation_types])
|
|
]
|
|
)
|
|
def list(self, request, *args, **kwargs):
|
|
return super().list(request, *args, **kwargs)
|
|
|
|
def get_queryset(self):
|
|
automation_type = self.request.query_params.getlist('type', [])
|
|
if automation_type:
|
|
self.queryset = self.queryset.filter(type__in=automation_type)
|
|
return self.queryset.filter(space=self.request.space).all()
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(parameters=[
|
|
OpenApiParameter(name='internal_note', description=_('Text field to store data that gets carried over to the UserSpace created from the InviteLink'), type=str)
|
|
]))
|
|
class InviteLinkViewSet(LoggingMixin, StandardFilterModelViewSet):
|
|
queryset = InviteLink.objects
|
|
serializer_class = InviteLinkSerializer
|
|
permission_classes = [CustomIsSpaceOwner & CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
internal_note = self.request.query_params.get('internal_note', None)
|
|
if internal_note is not None:
|
|
self.queryset = self.queryset.filter(internal_note=internal_note)
|
|
|
|
if is_space_owner(self.request.user, self.request.space):
|
|
self.queryset = self.queryset.filter(space=self.request.space).all()
|
|
return super().get_queryset()
|
|
else:
|
|
return None
|
|
|
|
|
|
@extend_schema_view(list=extend_schema(
|
|
parameters=[OpenApiParameter(
|
|
name='type',
|
|
description=_('Return the CustomFilters matching the model type. Repeat for multiple.'),
|
|
type=str,
|
|
many=True,
|
|
enum=[m[0] for m in CustomFilter.MODELS])
|
|
]
|
|
))
|
|
class CustomFilterViewSet(LoggingMixin, StandardFilterModelViewSet):
|
|
queryset = CustomFilter.objects
|
|
serializer_class = CustomFilterSerializer
|
|
permission_classes = [CustomIsOwner & CustomTokenHasReadWriteScope]
|
|
pagination_class = DefaultPagination
|
|
|
|
def get_queryset(self):
|
|
# TODO add tests for filter
|
|
filter_type = self.request.query_params.getlist('type', [])
|
|
if filter_type:
|
|
self.queryset.filter(type__in=filter_type)
|
|
self.queryset = self.queryset.filter(Q(created_by=self.request.user) | Q(shared=self.request.user)).filter(
|
|
space=self.request.space).distinct()
|
|
return super().get_queryset()
|
|
|
|
|
|
class AccessTokenViewSet(LoggingMixin, viewsets.ModelViewSet):
|
|
queryset = AccessToken.objects
|
|
serializer_class = AccessTokenSerializer
|
|
permission_classes = [CustomIsOwner & CustomTokenHasReadWriteScope]
|
|
pagination_disabled = True
|
|
|
|
def get_queryset(self):
|
|
return self.queryset.filter(user=self.request.user)
|
|
|
|
|
|
# -------------- DRF custom views --------------------
|
|
|
|
|
|
class AuthTokenThrottle(AnonRateThrottle):
|
|
rate = '10/day'
|
|
|
|
|
|
class RecipeImportThrottle(UserRateThrottle):
|
|
rate = DRF_THROTTLE_RECIPE_URL_IMPORT
|
|
|
|
|
|
class CustomAuthToken(ObtainAuthToken):
|
|
throttle_classes = [AuthTokenThrottle]
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
serializer = self.serializer_class(data=request.data, context={'request': request})
|
|
serializer.is_valid(raise_exception=True)
|
|
user = serializer.validated_data['user']
|
|
if token := AccessToken.objects.filter(user=user, expires__gt=timezone.now(), scope__contains='read').filter(
|
|
scope__contains='write').first():
|
|
access_token = token
|
|
else:
|
|
access_token = AccessToken.objects.create(user=user,
|
|
token=f'tda_{str(uuid.uuid4()).replace("-", "_")}',
|
|
expires=(timezone.now() + timezone.timedelta(days=365 * 5)),
|
|
scope='read write app')
|
|
return Response({
|
|
'id': access_token.id,
|
|
'token': access_token.token,
|
|
'scope': access_token.scope,
|
|
'expires': access_token.expires,
|
|
'user_id': access_token.user.pk,
|
|
'test': user.pk
|
|
})
|
|
|
|
|
|
class RecipeUrlImportView(APIView):
|
|
throttle_classes = [RecipeImportThrottle]
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
|
|
# TODO add response serializer
|
|
@extend_schema(request=RecipeFromSourceSerializer(many=False), responses=RecipeFromSourceResponseSerializer(many=False))
|
|
def post(self, request, *args, **kwargs):
|
|
"""
|
|
function to retrieve a recipe from a given url or source string
|
|
:param request: standard request with additional post parameters
|
|
- url: url to use for importing recipe
|
|
- data: if no url is given recipe is imported from provided source data
|
|
- (optional) bookmarklet: id of bookmarklet import to use, overrides URL and data attributes
|
|
:return: JsonResponse containing the parsed json and images
|
|
"""
|
|
scrape = None
|
|
serializer = RecipeFromSourceSerializer(data=request.data)
|
|
response = {}
|
|
|
|
if serializer.is_valid():
|
|
|
|
if (b_pk := serializer.validated_data.get('bookmarklet', None)) and (
|
|
bookmarklet := BookmarkletImport.objects.filter(pk=b_pk).first()):
|
|
serializer.validated_data['url'] = bookmarklet.url
|
|
serializer.validated_data['data'] = bookmarklet.html
|
|
bookmarklet.delete()
|
|
|
|
url = serializer.validated_data.get('url', None)
|
|
data = unquote(serializer.validated_data.get('data', ''))
|
|
|
|
if not url and not data:
|
|
response['error'] = True
|
|
response['msg'] = _('Nothing to do.')
|
|
return Response(RecipeFromSourceResponseSerializer().to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
elif url and not data:
|
|
if re.match('^(https?://)?(www\\.youtube\\.com|youtu\\.be)/.+$', url):
|
|
if validate_import_url(url):
|
|
response['recipe'] = get_from_youtube_scraper(url, request)
|
|
if url and url.strip() != '':
|
|
response['duplicates'] = Recipe.objects.filter(space=request.space, source_url=url.strip()).values('id', 'name').all()
|
|
return Response(RecipeFromSourceResponseSerializer(context={'request': request}).to_representation(response), status=status.HTTP_200_OK)
|
|
|
|
tandoor_url = None
|
|
if re.match('^(.)*/recipe/[0-9]+/\?share=[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$', url):
|
|
tandoor_url = url.replace('/recipe/', '/api/recipe/')
|
|
elif re.match('^(.)*/view/recipe/[0-9]+/[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$', url):
|
|
tandoor_url = (url.replace('/view/recipe/', '/api/recipe/').replace(re.split('/recipe/[0-9]+', url)[1], '') + '?share=' +
|
|
re.split('/recipe/[0-9]+', url)[1].replace('/', ''))
|
|
if tandoor_url and validate_import_url(tandoor_url):
|
|
recipe_json = requests.get(tandoor_url).json()
|
|
recipe_json = clean_dict(recipe_json, 'id')
|
|
serialized_recipe = RecipeExportSerializer(data=recipe_json, context={'request': request})
|
|
if serialized_recipe.is_valid():
|
|
recipe = serialized_recipe.save()
|
|
if validate_import_url(recipe_json['image']):
|
|
if '?' in recipe_json['image']:
|
|
filetype = pathlib.Path(recipe_json['image'].split('?')[0]).suffix
|
|
else:
|
|
filetype = pathlib.Path(recipe_json["image"]).suffix
|
|
recipe.image = File(handle_image(request,
|
|
File(io.BytesIO(requests.get(recipe_json['image']).content), name='image'),
|
|
filetype=filetype),
|
|
name=f'{uuid.uuid4()}_{recipe.pk}.{filetype}')
|
|
recipe.save()
|
|
response['recipe_id'] = recipe.pk
|
|
return Response(RecipeFromSourceResponseSerializer(context={'request': request}).to_representation(response), status=status.HTTP_200_OK)
|
|
else:
|
|
try:
|
|
if validate_import_url(url):
|
|
html = requests.get(
|
|
url,
|
|
headers={
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0"}
|
|
).content
|
|
scrape = scrape_html(org_url=url, html=html, supported_only=False)
|
|
else:
|
|
response['error'] = True
|
|
response['msg'] = _('Invalid Url')
|
|
return Response(RecipeFromSourceResponseSerializer().to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
except NoSchemaFoundInWildMode:
|
|
pass
|
|
except requests.exceptions.ConnectionError:
|
|
response['error'] = True
|
|
response['msg'] = _('Connection Refused.')
|
|
return Response(RecipeFromSourceResponseSerializer().to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
except requests.exceptions.MissingSchema:
|
|
response['error'] = True
|
|
response['msg'] = _('Bad URL Schema.')
|
|
return Response(RecipeFromSourceResponseSerializer().to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
else:
|
|
try:
|
|
data_json = json.loads(data)
|
|
if '@context' not in data_json:
|
|
data_json['@context'] = 'https://schema.org'
|
|
if '@type' not in data_json:
|
|
data_json['@type'] = 'Recipe'
|
|
data = "<script type='application/ld+json'>" + json.dumps(data_json) + "</script>"
|
|
except JSONDecodeError:
|
|
pass
|
|
scrape = scrape_html(html=data, org_url='https://urlnotfound.none', supported_only=False)
|
|
if not url and (found_url := scrape.schema.data.get('url', 'https://urlnotfound.none')):
|
|
scrape = scrape_html(html=data, org_url=found_url, supported_only=False)
|
|
|
|
if scrape:
|
|
response['recipe'] = helper.get_from_scraper(scrape, request)
|
|
response['images'] = list(dict.fromkeys(get_images_from_soup(scrape.soup, url)))
|
|
if url and url.strip() != '':
|
|
response['duplicates'] = Recipe.objects.filter(space=request.space, source_url=url.strip()).values('id', 'name').all()
|
|
return Response(RecipeFromSourceResponseSerializer(context={'request': request}).to_representation(response), status=status.HTTP_200_OK)
|
|
|
|
else:
|
|
response['error'] = True
|
|
response['msg'] = _('No usable data could be found.')
|
|
return Response(RecipeFromSourceResponseSerializer().to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
else:
|
|
response['error'] = True
|
|
response['msg'] = serializer.errors
|
|
return Response(RecipeFromSourceResponseSerializer().to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class AiEndpointThrottle(UserRateThrottle):
|
|
rate = AI_RATELIMIT
|
|
|
|
|
|
class ImageToRecipeView(APIView):
|
|
parser_classes = [MultiPartParser]
|
|
throttle_classes = [AiEndpointThrottle]
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
|
|
@extend_schema(request=ImportImageSerializer(many=False), responses=RecipeFromSourceResponseSerializer(many=False))
|
|
def post(self, request, *args, **kwargs):
|
|
"""
|
|
|
|
"""
|
|
serializer = ImportImageSerializer(data=request.data, partial=True)
|
|
if serializer.is_valid():
|
|
# TODO max file size check
|
|
|
|
base64type = None
|
|
uploaded_file = serializer.validated_data['image']
|
|
try:
|
|
img = PIL.Image.open(uploaded_file)
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format=img.format)
|
|
base64type = 'image/' + img.format
|
|
file_bytes = buffer.getvalue()
|
|
except PIL.UnidentifiedImageError:
|
|
uploaded_file.seek(0)
|
|
file_bytes = uploaded_file.read()
|
|
# TODO detect if PDF
|
|
base64type = 'application/pdf'
|
|
|
|
# TODO cant use ingredient splitting because scraper gets upset "Please separate the ingredients into amount, unit, food and if required a note. "
|
|
# TODO maybe not use scraper?
|
|
messages = [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": "Please look at the file and return the contained recipe as a structured JSON in the same language as given in the file. For the JSON use the format given in the schema.org/recipe schema. Do not make anything up and leave everything blank you do not know. If shown in the file please also return the nutrition in the format specified in the schema.org/recipe schema."
|
|
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": f'data:{base64type};base64,{base64.b64encode(file_bytes).decode("utf-8")}'
|
|
},
|
|
|
|
]
|
|
},
|
|
]
|
|
|
|
try:
|
|
ai_response = completion(api_key=AI_API_KEY, model=AI_MODEL_NAME, response_format={"type": "json_object"}, messages=messages, )
|
|
except BadRequestError as err:
|
|
response = {
|
|
'error': True,
|
|
'msg': 'The AI could not process your request. \n\n' + err.message,
|
|
}
|
|
return Response(RecipeFromSourceResponseSerializer(context={'request': request}).to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
response_text = ai_response.choices[0].message.content
|
|
|
|
try:
|
|
data_json = json.loads(response_text)
|
|
# if '@context' not in data_json:
|
|
# data_json['@context'] = 'https://schema.org'
|
|
# if '@type' not in data_json:
|
|
# data_json['@type'] = 'Recipe'
|
|
data = "<script type='application/ld+json'>" + json.dumps(data_json) + "</script>"
|
|
# data = "<script type='application/ld+json'>" + response_text + "</script>"
|
|
|
|
scrape = scrape_html(html=data, org_url='https://urlnotfound.none', supported_only=False)
|
|
if scrape:
|
|
response = {}
|
|
response['recipe'] = helper.get_from_scraper(scrape, request)
|
|
response['images'] = []
|
|
response['duplicates'] = []
|
|
return Response(RecipeFromSourceResponseSerializer(context={'request': request}).to_representation(response), status=status.HTTP_200_OK)
|
|
except JSONDecodeError:
|
|
traceback.print_exc()
|
|
response = {
|
|
'error': True,
|
|
'msg': "Error parsing AI results. Response Text:\n\n" + response_text
|
|
}
|
|
return Response(RecipeFromSourceResponseSerializer(context={'request': request}).to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
else:
|
|
response = {
|
|
'error': True,
|
|
'msg': "Error parsing input:\n\n" + str(serializer.errors)
|
|
}
|
|
return Response(RecipeFromSourceResponseSerializer(context={'request': request}).to_representation(response), status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class AppImportView(APIView):
|
|
parser_classes = [MultiPartParser]
|
|
throttle_classes = [RecipeImportThrottle]
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
|
|
@extend_schema(request=ImportImageSerializer(many=False), responses=RecipeFromSourceResponseSerializer(many=False))
|
|
def post(self, request, *args, **kwargs):
|
|
limit, msg = above_space_limit(request.space)
|
|
if limit:
|
|
return Response({'error': True, 'msg': _('File is above space limit')}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
form = ImportForm(request.POST, request.FILES)
|
|
if form.is_valid() and request.FILES != {}:
|
|
try:
|
|
integration = get_integration(request, form.cleaned_data['type'])
|
|
|
|
il = ImportLog.objects.create(type=form.cleaned_data['type'], created_by=request.user, space=request.space)
|
|
files = []
|
|
for f in request.FILES.getlist('files'):
|
|
files.append({'file': io.BytesIO(f.read()), 'name': f.name})
|
|
t = threading.Thread(target=integration.do_import, args=[files, il, form.cleaned_data['duplicates']])
|
|
t.setDaemon(True)
|
|
t.start()
|
|
|
|
return Response({'import_id': il.pk}, status=status.HTTP_200_OK)
|
|
except NotImplementedError:
|
|
return Response({'error': True, 'msg': _('Importing is not implemented for this provider')},
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
else:
|
|
return Response({'error': True, 'msg': form.errors}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class FdcSearchView(APIView):
|
|
permission_classes = [CustomIsUser & CustomTokenHasReadWriteScope]
|
|
|
|
@extend_schema(responses=FdcQuerySerializer(many=False),
|
|
parameters=[OpenApiParameter(name='query', type=str),
|
|
OpenApiParameter(name='dataType', description='options: Branded,Foundation,Survey (FNDDS),SR Legacy', type=str, many=True)])
|
|
def get(self, request, format=None):
|
|
query = self.request.query_params.get('query', None)
|
|
if query is not None:
|
|
data_types = self.request.query_params.getlist('dataType', ['Foundation'])
|
|
|
|
response = requests.get(f'https://api.nal.usda.gov/fdc/v1/foods/search?api_key={FDC_API_KEY}&query={query}&dataType={",".join(data_types)}')
|
|
|
|
if response.status_code == 429:
|
|
return JsonResponse(
|
|
{
|
|
'msg':
|
|
'API Key Rate Limit reached/exceeded, see https://api.data.gov/docs/rate-limits/ for more information. \
|
|
Configure your key in Tandoor using environment FDC_API_KEY variable.'
|
|
},
|
|
status=429,
|
|
json_dumps_params={'indent': 4})
|
|
if response.status_code != 200:
|
|
return JsonResponse({
|
|
'msg': f'Error while requesting FDC data using url https://api.nal.usda.gov/fdc/v1/foods/search?api_key=*****&query={query}'},
|
|
status=response.status_code,
|
|
json_dumps_params={'indent': 4})
|
|
|
|
return Response(FdcQuerySerializer(context={'request': request}).to_representation(json.loads(response.content)), status=status.HTTP_200_OK)
|
|
|
|
|
|
@extend_schema(
|
|
request=None,
|
|
responses=None,
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([CustomIsAdmin & CustomTokenHasReadWriteScope])
|
|
# TODO add rate limiting
|
|
# TODO add api tests
|
|
# TODO initial request should include some sort of data, inadvertently doing a naked POST could make changes that weren't intended
|
|
def reset_food_inheritance(request):
|
|
"""
|
|
function to reset inheritance from api, see food method for docs
|
|
"""
|
|
try:
|
|
Food.reset_inheritance(space=request.space)
|
|
return Response({'message': 'success', }, status=status.HTTP_200_OK)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return Response({}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@api_view(['GET'])
|
|
# @schema(AutoSchema()) #TODO add proper schema
|
|
@permission_classes([CustomIsAdmin & CustomTokenHasReadWriteScope])
|
|
# TODO add rate limiting
|
|
def switch_active_space(request, space_id):
|
|
"""
|
|
api endpoint to switch space function
|
|
"""
|
|
try:
|
|
space = get_object_or_404(Space, id=space_id)
|
|
user_space = switch_user_active_space(request.user, space)
|
|
if user_space:
|
|
return Response(UserSpaceSerializer().to_representation(instance=user_space), status=status.HTTP_200_OK)
|
|
else:
|
|
return Response("not found", status=status.HTTP_404_NOT_FOUND)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return Response({}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@api_view(['GET'])
|
|
# @schema(AutoSchema()) #TODO add proper schema
|
|
@permission_classes([CustomIsUser & CustomTokenHasReadWriteScope])
|
|
def download_file(request, file_id):
|
|
"""
|
|
function to download a user file securely (wrapping as zip to prevent any context based XSS problems)
|
|
temporary solution until a real file manager is implemented
|
|
"""
|
|
try:
|
|
uf = UserFile.objects.get(space=request.space, pk=file_id)
|
|
|
|
in_memory = io.BytesIO()
|
|
zf = ZipFile(in_memory, mode="w")
|
|
zf.writestr(uf.file.name, uf.file.file.read())
|
|
zf.close()
|
|
|
|
response = HttpResponse(in_memory.getvalue(), content_type='application/force-download')
|
|
response['Content-Disposition'] = 'attachment; filename="' + uf.name + '.zip"'
|
|
return response
|
|
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return Response({}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@api_view(['POST'])
|
|
# @schema(AutoSchema()) #TODO add proper schema
|
|
@permission_classes([CustomIsUser & CustomTokenHasReadWriteScope])
|
|
def import_files(request):
|
|
"""
|
|
function to handle files passed by application importer
|
|
"""
|
|
limit, msg = above_space_limit(request.space)
|
|
if limit:
|
|
return Response({'error': True, 'msg': _('File is above space limit')}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
form = ImportForm(request.POST, request.FILES)
|
|
if form.is_valid() and request.FILES != {}:
|
|
try:
|
|
integration = get_integration(request, form.cleaned_data['type'])
|
|
|
|
il = ImportLog.objects.create(type=form.cleaned_data['type'], created_by=request.user, space=request.space)
|
|
files = []
|
|
for f in request.FILES.getlist('files'):
|
|
files.append({'file': io.BytesIO(f.read()), 'name': f.name})
|
|
t = threading.Thread(target=integration.do_import, args=[files, il, form.cleaned_data['duplicates']])
|
|
t.setDaemon(True)
|
|
t.start()
|
|
|
|
return Response({'import_id': il.pk}, status=status.HTTP_200_OK)
|
|
except NotImplementedError:
|
|
return Response({'error': True, 'msg': _('Importing is not implemented for this provider')},
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
else:
|
|
return Response({'error': True, 'msg': form.errors}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
# TODO implement schema https://drf-spectacular.readthedocs.io/en/latest/customization.html#replace-views-with-openapiviewextension
|
|
class ImportOpenData(APIView):
|
|
permission_classes = [CustomIsAdmin & CustomTokenHasReadWriteScope]
|
|
|
|
def get(self, request, format=None):
|
|
response = requests.get(
|
|
'https://raw.githubusercontent.com/TandoorRecipes/open-tandoor-data/main/build/meta.json')
|
|
metadata = json.loads(response.content)
|
|
return Response(metadata)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
# TODO validate data
|
|
print(request.data)
|
|
selected_version = request.data['selected_version']
|
|
selected_datatypes = request.data['selected_datatypes']
|
|
update_existing = str2bool(request.data['update_existing'])
|
|
use_metric = str2bool(request.data['use_metric'])
|
|
|
|
response = requests.get(
|
|
f'https://raw.githubusercontent.com/TandoorRecipes/open-tandoor-data/main/build/{selected_version}.json') # TODO catch 404, timeout, ...
|
|
data = json.loads(response.content)
|
|
|
|
response_obj = {}
|
|
|
|
data_importer = OpenDataImporter(request, data, update_existing=update_existing, use_metric=use_metric)
|
|
|
|
if selected_datatypes['unit']['selected']:
|
|
response_obj['unit'] = data_importer.import_units().to_dict()
|
|
if selected_datatypes['category']['selected']:
|
|
response_obj['category'] = data_importer.import_category().to_dict()
|
|
if selected_datatypes['property']['selected']:
|
|
response_obj['property'] = data_importer.import_property().to_dict()
|
|
if selected_datatypes['store']['selected']:
|
|
response_obj['store'] = data_importer.import_supermarket().to_dict()
|
|
if selected_datatypes['food']['selected']:
|
|
response_obj['food'] = data_importer.import_food().to_dict()
|
|
if selected_datatypes['conversion']['selected']:
|
|
response_obj['conversion'] = data_importer.import_conversion().to_dict()
|
|
|
|
return Response(response_obj)
|
|
|
|
|
|
# TODO implement schema
|
|
class LocalizationViewSet(viewsets.GenericViewSet):
|
|
permission_classes = [CustomIsGuest & CustomTokenHasReadWriteScope]
|
|
serializer_class = LocalizationSerializer
|
|
pagination_disabled = True
|
|
|
|
def get_queryset(self):
|
|
return None
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
langs = []
|
|
for l in settings.LANGUAGES:
|
|
langs.append({'code': l[0], 'language': l[1]})
|
|
return Response(LocalizationSerializer(langs, many=True).data)
|
|
|
|
|
|
# TODO implement schema
|
|
class ServerSettingsViewSet(viewsets.GenericViewSet):
|
|
permission_classes = [] # public view to use by anonymous request (recipe share page)
|
|
serializer_class = ServerSettingsSerializer
|
|
pagination_disabled = True
|
|
|
|
def get_queryset(self):
|
|
return None
|
|
|
|
@extend_schema(responses=ServerSettingsSerializer(many=False))
|
|
@decorators.action(detail=False, pagination_class=None, methods=['GET'], serializer_class=ServerSettingsSerializer, )
|
|
def current(self, request, *args, **kwargs):
|
|
s = dict()
|
|
# Attention: No login required, do not return sensitive data
|
|
s['shopping_min_autosync_interval'] = settings.SHOPPING_MIN_AUTOSYNC_INTERVAL
|
|
s['enable_pdf_export'] = settings.ENABLE_PDF_EXPORT
|
|
s['disable_external_connectors'] = settings.DISABLE_EXTERNAL_CONNECTORS
|
|
s['terms_url'] = settings.TERMS_URL
|
|
s['privacy_url'] = settings.PRIVACY_URL
|
|
s['imprint_url'] = settings.IMPRINT_URL
|
|
s['hosted'] = settings.HOSTED
|
|
s['debug'] = settings.DEBUG
|
|
s['version'] = TANDOOR_VERSION
|
|
|
|
return Response(ServerSettingsSerializer(s, many=False).data)
|
|
|
|
|
|
def get_recipe_provider(recipe):
|
|
if recipe.storage.method == Storage.DROPBOX:
|
|
return Dropbox
|
|
elif recipe.storage.method == Storage.NEXTCLOUD:
|
|
return Nextcloud
|
|
elif recipe.storage.method == Storage.LOCAL:
|
|
return Local
|
|
else:
|
|
raise Exception('Provider not implemented')
|
|
|
|
|
|
# TODO update so that link is provided as part of serializer
|
|
@extend_schema(
|
|
request=inline_serializer(name="RecipePKSerializer", fields={'pk': IntegerField()}),
|
|
responses=None,
|
|
)
|
|
@api_view(['GET'])
|
|
@permission_classes([CustomIsUser & CustomTokenHasReadWriteScope])
|
|
def get_external_file_link(request, recipe_id):
|
|
recipe = get_object_or_404(Recipe, pk=recipe_id, space=request.space)
|
|
if not recipe.link:
|
|
recipe.link = get_recipe_provider(recipe).get_share_link(recipe)
|
|
recipe.save()
|
|
|
|
return HttpResponse(recipe.link)
|
|
|
|
|
|
@extend_schema(
|
|
request=inline_serializer(name="RecipePKSerializer", fields={'pk': IntegerField()}),
|
|
responses=None,
|
|
)
|
|
@api_view(['GET'])
|
|
@permission_classes([(CustomIsGuest | CustomIsUser) & CustomTokenHasReadWriteScope])
|
|
def get_recipe_file(request, recipe_id):
|
|
recipe = get_object_or_404(Recipe, pk=recipe_id, space=request.space)
|
|
if recipe.storage:
|
|
return FileResponse(get_recipe_provider(recipe).get_file(recipe), filename=f'{recipe.name}.pdf')
|
|
else:
|
|
return FileResponse()
|
|
|
|
|
|
@group_required('user')
|
|
# TODO add rate limiting
|
|
# TODO change to some sort of asynchronous trigger
|
|
def sync_all(request):
|
|
if request.space.demo or settings.HOSTED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('This feature is not yet available in the hosted version of tandoor!'))
|
|
return redirect('index')
|
|
|
|
monitors = Sync.objects.filter(active=True).filter(space=request.user.userspace_set.filter(active=1).first().space)
|
|
|
|
error = False
|
|
for monitor in monitors:
|
|
if monitor.storage.method == Storage.DROPBOX:
|
|
ret = Dropbox.import_all(monitor)
|
|
if not ret:
|
|
error = True
|
|
if monitor.storage.method == Storage.NEXTCLOUD:
|
|
ret = Nextcloud.import_all(monitor)
|
|
if not ret:
|
|
error = True
|
|
if monitor.storage.method == Storage.LOCAL:
|
|
ret = Local.import_all(monitor)
|
|
if not ret:
|
|
error = True
|
|
|
|
if not error:
|
|
messages.add_message(request, messages.SUCCESS, _('Sync successful!'))
|
|
return redirect('list_recipe_import')
|
|
else:
|
|
messages.add_message(request, messages.ERROR, _('Error synchronizing with Storage'))
|
|
return redirect('list_recipe_import')
|
|
|
|
|
|
# TODO migrate to normal standard view
|
|
@extend_schema(
|
|
request=inline_serializer(name="ShareLinkSerializer", fields={'pk': IntegerField()}),
|
|
responses=inline_serializer(name="ShareLinkSerializer",
|
|
fields={'pk': IntegerField(), 'share': UUIDField(), 'link': CharField()})
|
|
)
|
|
@api_view(['GET'])
|
|
# @schema(AutoSchema()) #TODO add proper schema https://drf-spectacular.readthedocs.io/en/latest/customization.html#replace-views-with-openapiviewextension
|
|
@permission_classes([CustomIsUser & CustomTokenHasReadWriteScope])
|
|
def share_link(request, pk):
|
|
if request.space.allow_sharing and has_group_permission(request.user, ('user',)):
|
|
recipe = get_object_or_404(Recipe, pk=pk, space=request.space)
|
|
link = ShareLink.objects.create(recipe=recipe, created_by=request.user, space=request.space)
|
|
return JsonResponse({'pk': pk, 'share': link.uuid,
|
|
'link': request.build_absolute_uri(reverse('index') + f'recipe/{pk}/?share={link.uuid}')})
|
|
else:
|
|
return JsonResponse({'error': 'sharing_disabled'}, status=403)
|
|
|
|
|
|
def meal_plans_to_ical(queryset, filename):
|
|
cal = Calendar()
|
|
|
|
for p in queryset:
|
|
event = Event()
|
|
event['uid'] = p.id
|
|
event.add('dtstart', p.from_date)
|
|
if p.to_date:
|
|
event.add('dtend', p.to_date)
|
|
else:
|
|
event.add('dtend', p.from_date)
|
|
event['summary'] = f'{p.meal_type.name}: {p.get_label()}'
|
|
event['description'] = p.note
|
|
cal.add_component(event)
|
|
|
|
response = FileResponse(io.BytesIO(cal.to_ical()))
|
|
response["Content-Disposition"] = f'attachment; filename={filename}' # noqa: E501
|
|
|
|
return response
|
|
|
|
|
|
@extend_schema(
|
|
request=inline_serializer(name="IngredientStringSerializer", fields={'text': CharField()}),
|
|
responses=inline_serializer(name="ParsedIngredientSerializer",
|
|
fields={'amount': IntegerField(), 'unit': CharField(), 'food': CharField(),
|
|
'note': CharField()})
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([CustomIsUser & CustomTokenHasReadWriteScope])
|
|
def ingredient_from_string(request):
|
|
text = request.data['text']
|
|
|
|
ingredient_parser = IngredientParser(request, False)
|
|
amount, unit, food, note = ingredient_parser.parse(text)
|
|
|
|
ingredient = {'amount': amount, 'unit': None, 'food': None, 'note': note}
|
|
if food:
|
|
food, created = Food.objects.get_or_create(space=request.space, name=food)
|
|
ingredient['food'] = {'name': food.name, 'id': food.id}
|
|
|
|
if unit:
|
|
unit, created = Unit.objects.get_or_create(space=request.space, name=unit)
|
|
ingredient['unit'] = {'name': unit.name, 'id': unit.id}
|
|
|
|
return JsonResponse(ingredient, status=200)
|