add multiple API tokens per user, removes old API tokens

This commit is contained in:
vabene1111
2022-08-04 17:24:54 +02:00
parent 9e62d8a3a3
commit 3f77b73a61
12 changed files with 666 additions and 220 deletions

View File

@@ -4,6 +4,7 @@ import re
import uuid
from datetime import date, timedelta
import oauth2_provider.models
from PIL import Image
from annoying.fields import AutoOneToOneField
from django.contrib import auth
@@ -63,6 +64,13 @@ auth.models.User.add_to_class('get_shopping_share', get_shopping_share)
auth.models.User.add_to_class('get_active_space', get_active_space)
def oauth_token_get_owner(self):
return self.user
oauth2_provider.models.AccessToken.add_to_class('get_owner', oauth_token_get_owner)
def get_model_name(model):
return ('_'.join(re.findall('[A-Z][^A-Z]*', model.__name__))).lower()

View File

@@ -1,4 +1,5 @@
import traceback
import uuid
from datetime import datetime, timedelta
from decimal import Decimal
from gettext import gettext as _
@@ -14,6 +15,7 @@ from django.utils import timezone
from django_scopes import scopes_disabled
from drf_writable_nested import UniqueFieldsMixin, WritableNestedModelSerializer
from PIL import Image
from oauth2_provider.models import AccessToken
from rest_framework import serializers
from rest_framework.exceptions import NotFound, ValidationError
@@ -143,7 +145,7 @@ class UserSerializer(WritableNestedModelSerializer):
list_serializer_class = SpaceFilterSerializer
model = User
fields = ('id', 'username', 'first_name', 'last_name', 'display_name')
read_only_fields = ('username', )
read_only_fields = ('username',)
class GroupSerializer(UniqueFieldsMixin, WritableNestedModelSerializer):
@@ -1134,6 +1136,27 @@ class BookmarkletImportSerializer(BookmarkletImportListSerializer):
read_only_fields = ('created_by', 'space')
# OAuth / Auth Token related Serializers
class AccessTokenSerializer(serializers.ModelSerializer):
token = serializers.SerializerMethodField('get_token')
def create(self, validated_data):
validated_data['token'] = f'tda_{str(uuid.uuid4()).replace("-","_")}'
validated_data['user'] = self.context['request'].user
return super().create(validated_data)
def get_token(self, obj):
if (timezone.now() - obj.created).seconds < 15:
return obj.token
return f'tda_************_******_***********{obj.token[len(obj.token)-4:]}'
class Meta:
model = AccessToken
fields = ('id', 'token', 'expires', 'scope', 'created', 'updated')
read_only_fields = ('id', 'token',)
# Export/Import Serializers
class KeywordExportSerializer(KeywordSerializer):

View File

@@ -159,9 +159,9 @@
<br/>
{% trans 'Use the token as an Authorization header prefixed by the word token as shown in the following examples:' %}
<br/>
<code>Authorization: Token {{ api_token }}</code> {% trans 'or' %}<br/>
<code>Authorization: Bearer {{ api_token }}</code> {% trans 'or' %}<br/>
<code>curl -X GET http://your.domain.com/api/recipes/ -H 'Authorization:
Token {{ api_token }}'</code>
Bearer {{ api_token }}'</code>
</div>
</div>

View File

@@ -0,0 +1,115 @@
import json
import pytest
from django.contrib import auth
from django.urls import reverse
from django.utils import timezone
from django_scopes import scopes_disabled
from oauth2_provider.models import AccessToken
from cookbook.models import ViewLog
LIST_URL = 'api:accesstoken-list'
DETAIL_URL = 'api:accesstoken-detail'
@pytest.fixture()
def obj_1(u1_s1):
return AccessToken.objects.create(user=auth.get_user(u1_s1), scope='test', expires=timezone.now() + timezone.timedelta(days=365 * 5), token='test1')
@pytest.fixture()
def obj_2(u1_s1):
return AccessToken.objects.create(user=auth.get_user(u1_s1), scope='test', expires=timezone.now() + timezone.timedelta(days=365 * 5), token='test2')
@pytest.mark.parametrize("arg", [
['a_u', 403],
['g1_s1', 200],
['u1_s1', 200],
['a1_s1', 200],
])
def test_list_permission(arg, request):
c = request.getfixturevalue(arg[0])
assert c.get(reverse(LIST_URL)).status_code == arg[1]
def test_list_space(obj_1, obj_2, u1_s1, u1_s2, space_2):
assert len(json.loads(u1_s1.get(reverse(LIST_URL)).content)) == 2
assert len(json.loads(u1_s2.get(reverse(LIST_URL)).content)) == 0
obj_1.user = auth.get_user(u1_s2)
obj_1.save()
assert len(json.loads(u1_s1.get(reverse(LIST_URL)).content)) == 1
assert len(json.loads(u1_s2.get(reverse(LIST_URL)).content)) == 1
def test_token_visibility(u1_s1, obj_1):
# tokens should only be returned on the first API request (first 15 seconds)
at = json.loads(u1_s1.get(reverse(DETAIL_URL, args=[obj_1.id])).content)
assert at['token'] == obj_1.token
with scopes_disabled():
obj_1.created = timezone.now() - timezone.timedelta(seconds=16)
obj_1.save()
at = json.loads(u1_s1.get(reverse(DETAIL_URL, args=[obj_1.id])).content)
assert at['token'] != obj_1.token
@pytest.mark.parametrize("arg", [
['a_u', 403],
['g1_s1', 404],
['u1_s1', 200],
['a1_s1', 404],
['g1_s2', 404],
['u1_s2', 404],
['a1_s2', 404],
])
def test_update(arg, request, obj_1):
c = request.getfixturevalue(arg[0])
r = c.patch(
reverse(
DETAIL_URL,
args={obj_1.id}
),
{'scope': 'lorem ipsum'},
content_type='application/json'
)
assert r.status_code == arg[1]
@pytest.mark.parametrize("arg", [
['a_u', 403],
['g1_s1', 201],
['u1_s1', 201],
['a1_s1', 201],
])
def test_add(arg, request, u1_s2, u2_s1, recipe_1_s1):
c = request.getfixturevalue(arg[0])
r = c.post(
reverse(LIST_URL),
{'scope': 'test', 'expires': timezone.now() + timezone.timedelta(days=365 * 5)},
content_type='application/json'
)
response = json.loads(r.content)
assert r.status_code == arg[1]
if r.status_code == 201:
assert response['scope'] == 'test'
def test_delete(u1_s1, u1_s2, obj_1):
r = u1_s2.delete(
reverse(
DETAIL_URL,
args={obj_1.id}
)
)
assert r.status_code == 404
r = u1_s1.delete(
reverse(
DETAIL_URL,
args={obj_1.id}
)
)
assert r.status_code == 204

View File

@@ -51,6 +51,7 @@ router.register(r'user', api.UserViewSet)
router.register(r'user-preference', api.UserPreferenceViewSet)
router.register(r'user-space', api.UserSpaceViewSet)
router.register(r'view-log', api.ViewLogViewSet)
router.register(r'access-token', api.AccessTokenViewSet)
urlpatterns = [
path('', views.index, name='index'),

View File

@@ -29,6 +29,7 @@ from django.utils.translation import gettext as _
from django_scopes import scopes_disabled
from icalendar import Calendar, Event
from PIL import UnidentifiedImageError
from oauth2_provider.models import AccessToken
from recipe_scrapers import scrape_html, scrape_me
from recipe_scrapers._exceptions import NoSchemaFoundInWildMode
from requests.exceptions import MissingSchema
@@ -86,7 +87,7 @@ from cookbook.serializer import (AutomationSerializer, BookmarkletImportListSeri
SupermarketCategorySerializer, SupermarketSerializer,
SyncLogSerializer, SyncSerializer, UnitSerializer,
UserFileSerializer, UserSerializer, UserPreferenceSerializer,
UserSpaceSerializer, ViewLogSerializer)
UserSpaceSerializer, ViewLogSerializer, AccessTokenSerializer)
from cookbook.views.import_export import get_integration
from recipes import settings
@@ -1090,6 +1091,15 @@ class CustomFilterViewSet(viewsets.ModelViewSet, StandardFilterMixin):
return super().get_queryset()
class AccessTokenViewSet(viewsets.ModelViewSet):
queryset = AccessToken.objects
serializer_class = AccessTokenSerializer
permission_classes = [CustomIsOwner]
def get_queryset(self):
return self.queryset.filter(user=self.request.user)
# -------------- DRF custom views --------------------
class AuthTokenThrottle(AnonRateThrottle):

View File

@@ -1,5 +1,6 @@
import os
import re
import uuid
from datetime import datetime
from uuid import UUID
@@ -18,6 +19,7 @@ from django.urls import reverse, reverse_lazy
from django.utils import timezone
from django.utils.translation import gettext as _
from django_scopes import scopes_disabled
from oauth2_provider.models import AccessToken
from rest_framework.authtoken.models import Token
from cookbook.forms import (CommentForm, Recipe, SearchPreferenceForm, ShoppingPreferenceForm,
@@ -338,8 +340,8 @@ def user_settings(request):
elif not search_error:
search_form = SearchPreferenceForm()
if (api_token := Token.objects.filter(user=request.user).first()) is None:
api_token = Token.objects.create(user=request.user)
if (api_token := AccessToken.objects.filter(user=request.user).first()) is None:
api_token = AccessToken.objects.create(user=request.user, token=f'tda_{str(uuid.uuid4()).replace("-","_")}', expires=(timezone.now() + timezone.timedelta(days=365*5)), scope='read write').token
# these fields require postgresql - just disable them if postgresql isn't available
if not settings.DATABASES['default']['ENGINE'] in ['django.db.backends.postgresql_psycopg2',