Add Supabase key validation and simplify frontend state management

- Add backend validation to detect and warn about anon vs service keys
- Prevent startup with incorrect Supabase key configuration
- Consolidate frontend state management following KISS principles
- Remove duplicate state tracking and sessionStorage polling
- Add clear error display when backend fails to start
- Improve .env.example documentation with detailed key selection guide
- Add comprehensive test coverage for validation logic
- Remove unused test results checking to eliminate 404 errors

The implementation now warns users about key misconfiguration while
maintaining backward compatibility. Frontend state is simplified with
MainLayout as the single source of truth for backend status.
This commit is contained in:
Rasmus Widing
2025-08-16 00:10:23 +03:00
parent ad1b8bf70f
commit 3800280f2e
19 changed files with 848 additions and 317 deletions

View File

@@ -6,6 +6,8 @@ import os
from dataclasses import dataclass
from urllib.parse import urlparse
import jwt
class ConfigurationError(Exception):
"""Raised when there's an error in configuration."""
@@ -46,6 +48,38 @@ def validate_openai_api_key(api_key: str) -> bool:
return True
def validate_supabase_key(supabase_key: str) -> tuple[bool, str]:
"""Validate Supabase key type and return validation result.
Returns:
tuple[bool, str]: (is_valid, message)
- (False, "ANON_KEY_DETECTED") if anon key detected
- (True, "VALID_SERVICE_KEY") if service key detected
- (False, "UNKNOWN_KEY_TYPE:{role}") for unknown roles
- (True, "UNABLE_TO_VALIDATE") if JWT cannot be decoded
"""
if not supabase_key:
return False, "EMPTY_KEY"
try:
# Decode JWT without verification to check the 'role' claim
# We don't verify the signature since we only need to check the role
decoded = jwt.decode(supabase_key, options={"verify_signature": False})
role = decoded.get("role")
if role == "anon":
return False, "ANON_KEY_DETECTED"
elif role == "service_role":
return True, "VALID_SERVICE_KEY"
else:
return False, f"UNKNOWN_KEY_TYPE:{role}"
except Exception:
# If we can't decode the JWT, we'll allow it to proceed
# This handles new key formats or non-JWT keys
return True, "UNABLE_TO_VALIDATE"
def validate_supabase_url(url: str) -> bool:
"""Validate Supabase URL format."""
if not url:
@@ -80,6 +114,29 @@ def load_environment_config() -> EnvironmentConfig:
validate_openai_api_key(openai_api_key)
validate_supabase_url(supabase_url)
# Validate Supabase key type
is_valid_key, key_message = validate_supabase_key(supabase_service_key)
if not is_valid_key:
if key_message == "ANON_KEY_DETECTED":
raise ConfigurationError(
"CRITICAL: You are using a Supabase ANON key instead of a SERVICE key.\n\n"
"The ANON key is a public key with read-only permissions that cannot write to the database.\n"
"This will cause all database operations to fail with 'permission denied' errors.\n\n"
"To fix this:\n"
"1. Go to your Supabase project dashboard\n"
"2. Navigate to Settings > API keys\n"
"3. Find the 'service_role' key (NOT the 'anon' key)\n"
"4. Update your SUPABASE_SERVICE_KEY environment variable\n\n"
"Key characteristics:\n"
"- ANON key: Starts with 'eyJ...' and has role='anon' (public, read-only)\n"
"- SERVICE key: Starts with 'eyJ...' and has role='service_role' (private, full access)\n\n"
"Current key role detected: anon"
)
elif key_message.startswith("UNKNOWN_KEY_TYPE:"):
role = key_message.split(":", 1)[1]
print(f"WARNING: Unknown Supabase key role '{role}'. Proceeding but may cause issues.")
# For UNABLE_TO_VALIDATE, we continue silently
# Optional environment variables with defaults
host = os.getenv("HOST", "0.0.0.0")
port_str = os.getenv("PORT")
@@ -97,8 +154,8 @@ def load_environment_config() -> EnvironmentConfig:
# Validate and convert port
try:
port = int(port_str)
except ValueError:
raise ConfigurationError(f"PORT must be a valid integer, got: {port_str}")
except ValueError as e:
raise ConfigurationError(f"PORT must be a valid integer, got: {port_str}") from e
return EnvironmentConfig(
openai_api_key=openai_api_key,
@@ -110,6 +167,11 @@ def load_environment_config() -> EnvironmentConfig:
)
def get_config() -> EnvironmentConfig:
"""Get environment configuration with validation."""
return load_environment_config()
def get_rag_strategy_config() -> RAGStrategyConfig:
"""Load RAG strategy configuration from environment variables."""

View File

@@ -80,6 +80,11 @@ async def lifespan(app: FastAPI):
logger.info("🚀 Starting Archon backend...")
try:
# Validate configuration FIRST - check for anon vs service key
from .config.config import get_config
get_config() # This will raise ConfigurationError if anon key detected
# Initialize credentials from database FIRST - this is the foundation for everything else
await initialize_credentials()

View File

@@ -227,14 +227,11 @@ class CredentialService:
self._cache[key] = value
# Upsert to database with proper conflict handling
result = (
supabase.table("archon_settings")
.upsert(
data,
on_conflict="key", # Specify the unique column for conflict resolution
)
.execute()
)
# Since we validate service key at startup, permission errors here indicate actual database issues
supabase.table("archon_settings").upsert(
data,
on_conflict="key", # Specify the unique column for conflict resolution
).execute()
# Invalidate RAG settings cache if this is a rag_strategy setting
if category == "rag_strategy":
@@ -256,7 +253,8 @@ class CredentialService:
try:
supabase = self._get_supabase_client()
result = supabase.table("archon_settings").delete().eq("key", key).execute()
# Since we validate service key at startup, we can directly execute
supabase.table("archon_settings").delete().eq("key", key).execute()
# Remove from cache
if key in self._cache:

View File

@@ -56,3 +56,5 @@ def test_existing_credential_returns_normally(client, mock_supabase_client):
assert data["is_encrypted"] is False
# Should not have is_default flag for real credentials
assert "is_default" not in data

View File

@@ -0,0 +1,221 @@
"""
Unit tests for Supabase key validation functionality.
Tests the JWT-based validation of anon vs service keys.
"""
import pytest
import jwt
from unittest.mock import patch, MagicMock
from src.server.config.config import (
validate_supabase_key,
ConfigurationError,
load_environment_config,
)
def test_validate_anon_key():
"""Test validation detects anon key correctly."""
# Create mock anon key JWT
anon_payload = {"role": "anon", "iss": "supabase"}
anon_token = jwt.encode(anon_payload, "secret", algorithm="HS256")
is_valid, msg = validate_supabase_key(anon_token)
assert is_valid == False
assert msg == "ANON_KEY_DETECTED"
def test_validate_service_key():
"""Test validation detects service key correctly."""
# Create mock service key JWT
service_payload = {"role": "service_role", "iss": "supabase"}
service_token = jwt.encode(service_payload, "secret", algorithm="HS256")
is_valid, msg = validate_supabase_key(service_token)
assert is_valid == True
assert msg == "VALID_SERVICE_KEY"
def test_validate_unknown_key():
"""Test validation handles unknown key roles."""
# Create mock key with unknown role
unknown_payload = {"role": "custom", "iss": "supabase"}
unknown_token = jwt.encode(unknown_payload, "secret", algorithm="HS256")
is_valid, msg = validate_supabase_key(unknown_token)
assert is_valid == False
assert "UNKNOWN_KEY_TYPE" in msg
assert "custom" in msg
def test_validate_invalid_jwt():
"""Test validation handles invalid JWT format gracefully."""
is_valid, msg = validate_supabase_key("not-a-jwt")
# Should allow invalid JWT to proceed (might be new format)
assert is_valid == True
assert msg == "UNABLE_TO_VALIDATE"
def test_validate_empty_key():
"""Test validation handles empty key."""
is_valid, msg = validate_supabase_key("")
assert is_valid == False
assert msg == "EMPTY_KEY"
def test_config_raises_on_anon_key():
"""Test that configuration loading raises error when anon key detected."""
# Create a mock anon key JWT
anon_payload = {"role": "anon", "iss": "supabase"}
mock_anon_key = jwt.encode(anon_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_anon_key,
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
with pytest.raises(ConfigurationError) as exc_info:
load_environment_config()
error_message = str(exc_info.value)
assert "CRITICAL: You are using a Supabase ANON key" in error_message
assert "service_role" in error_message
assert "permission denied" in error_message
def test_config_accepts_service_key():
"""Test that configuration loading accepts service key."""
# Create a mock service key JWT
service_payload = {"role": "service_role", "iss": "supabase"}
mock_service_key = jwt.encode(service_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_service_key,
"PORT": "8051", # Required for config
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
# Should not raise an exception
config = load_environment_config()
assert config.supabase_service_key == mock_service_key
def test_config_handles_invalid_jwt():
"""Test that configuration loading handles invalid JWT gracefully."""
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": "invalid-jwt-key",
"PORT": "8051", # Required for config
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
with patch("builtins.print") as mock_print:
# Should not raise an exception for invalid JWT
config = load_environment_config()
assert config.supabase_service_key == "invalid-jwt-key"
def test_config_warns_on_unknown_role():
"""Test that configuration loading warns for unknown roles.
NOTE: This currently prints a warning but doesn't fail.
TODO: Per alpha principles, unknown key types should fail fast, not just warn.
"""
# Create a mock key with unknown role
unknown_payload = {"role": "custom_role", "iss": "supabase"}
mock_unknown_key = jwt.encode(unknown_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_unknown_key,
"PORT": "8051", # Required for config
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
with patch("builtins.print") as mock_print:
# Should not raise an exception but should print warning
config = load_environment_config()
assert config.supabase_service_key == mock_unknown_key
# Check that warning was printed
mock_print.assert_called_once()
call_args = mock_print.call_args[0][0]
assert "WARNING: Unknown Supabase key role 'custom_role'" in call_args
def test_config_raises_on_anon_key_with_port():
"""Test that anon key detection works properly with all required env vars."""
# Create a mock anon key JWT
anon_payload = {"role": "anon", "iss": "supabase"}
mock_anon_key = jwt.encode(anon_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_anon_key,
"PORT": "8051",
"OPENAI_API_KEY": "sk-test123" # Valid OpenAI key
},
clear=True
):
# Should still raise ConfigurationError for anon key even with valid OpenAI key
with pytest.raises(ConfigurationError) as exc_info:
load_environment_config()
error_message = str(exc_info.value)
assert "CRITICAL: You are using a Supabase ANON key" in error_message
def test_jwt_decoding_with_real_structure():
"""Test JWT decoding with realistic Supabase JWT structure."""
# More realistic Supabase JWT payload structure
realistic_anon_payload = {
"aud": "authenticated",
"exp": 1999999999,
"iat": 1234567890,
"iss": "supabase",
"ref": "abcdefghij",
"role": "anon",
}
realistic_service_payload = {
"aud": "authenticated",
"exp": 1999999999,
"iat": 1234567890,
"iss": "supabase",
"ref": "abcdefghij",
"role": "service_role",
}
anon_token = jwt.encode(realistic_anon_payload, "secret", algorithm="HS256")
service_token = jwt.encode(realistic_service_payload, "secret", algorithm="HS256")
# Test anon key detection
is_valid_anon, msg_anon = validate_supabase_key(anon_token)
assert is_valid_anon == False
assert msg_anon == "ANON_KEY_DETECTED"
# Test service key detection
is_valid_service, msg_service = validate_supabase_key(service_token)
assert is_valid_service == True
assert msg_service == "VALID_SERVICE_KEY"