Merge pull request #232 from coleam00/fix/supabase-key-validation-and-state-consolidation

Fix Supabase key validation and consolidate frontend state management
This commit is contained in:
Wirasm
2025-08-18 21:19:27 +03:00
committed by GitHub
19 changed files with 854 additions and 317 deletions

View File

@@ -6,6 +6,8 @@ import os
from dataclasses import dataclass
from urllib.parse import urlparse
from jose import jwt
class ConfigurationError(Exception):
"""Raised when there's an error in configuration."""
@@ -46,6 +48,49 @@ def validate_openai_api_key(api_key: str) -> bool:
return True
def validate_supabase_key(supabase_key: str) -> tuple[bool, str]:
"""Validate Supabase key type and return validation result.
Returns:
tuple[bool, str]: (is_valid, message)
- (False, "ANON_KEY_DETECTED") if anon key detected
- (True, "VALID_SERVICE_KEY") if service key detected
- (False, "UNKNOWN_KEY_TYPE:{role}") for unknown roles
- (True, "UNABLE_TO_VALIDATE") if JWT cannot be decoded
"""
if not supabase_key:
return False, "EMPTY_KEY"
try:
# Decode JWT without verification to check the 'role' claim
# We don't verify the signature since we only need to check the role
# Also skip all other validations (aud, exp, etc) since we only care about the role
decoded = jwt.decode(
supabase_key,
'',
options={
"verify_signature": False,
"verify_aud": False,
"verify_exp": False,
"verify_nbf": False,
"verify_iat": False
}
)
role = decoded.get("role")
if role == "anon":
return False, "ANON_KEY_DETECTED"
elif role == "service_role":
return True, "VALID_SERVICE_KEY"
else:
return False, f"UNKNOWN_KEY_TYPE:{role}"
except Exception:
# If we can't decode the JWT, we'll allow it to proceed
# This handles new key formats or non-JWT keys
return True, "UNABLE_TO_VALIDATE"
def validate_supabase_url(url: str) -> bool:
"""Validate Supabase URL format."""
if not url:
@@ -80,6 +125,34 @@ def load_environment_config() -> EnvironmentConfig:
validate_openai_api_key(openai_api_key)
validate_supabase_url(supabase_url)
# Validate Supabase key type
is_valid_key, key_message = validate_supabase_key(supabase_service_key)
if not is_valid_key:
if key_message == "ANON_KEY_DETECTED":
raise ConfigurationError(
"CRITICAL: You are using a Supabase ANON key instead of a SERVICE key.\n\n"
"The ANON key is a public key with read-only permissions that cannot write to the database.\n"
"This will cause all database operations to fail with 'permission denied' errors.\n\n"
"To fix this:\n"
"1. Go to your Supabase project dashboard\n"
"2. Navigate to Settings > API keys\n"
"3. Find the 'service_role' key (NOT the 'anon' key)\n"
"4. Update your SUPABASE_SERVICE_KEY environment variable\n\n"
"Key characteristics:\n"
"- ANON key: Starts with 'eyJ...' and has role='anon' (public, read-only)\n"
"- SERVICE key: Starts with 'eyJ...' and has role='service_role' (private, full access)\n\n"
"Current key role detected: anon"
)
elif key_message.startswith("UNKNOWN_KEY_TYPE:"):
role = key_message.split(":", 1)[1]
raise ConfigurationError(
f"CRITICAL: Unknown Supabase key role '{role}'.\n\n"
f"Expected 'service_role' but found '{role}'.\n"
f"This key type is not supported and will likely cause failures.\n\n"
f"Please use a valid service_role key from your Supabase dashboard."
)
# For UNABLE_TO_VALIDATE, we continue silently
# Optional environment variables with defaults
host = os.getenv("HOST", "0.0.0.0")
port_str = os.getenv("PORT")
@@ -97,8 +170,8 @@ def load_environment_config() -> EnvironmentConfig:
# Validate and convert port
try:
port = int(port_str)
except ValueError:
raise ConfigurationError(f"PORT must be a valid integer, got: {port_str}")
except ValueError as e:
raise ConfigurationError(f"PORT must be a valid integer, got: {port_str}") from e
return EnvironmentConfig(
openai_api_key=openai_api_key,
@@ -110,6 +183,11 @@ def load_environment_config() -> EnvironmentConfig:
)
def get_config() -> EnvironmentConfig:
"""Get environment configuration with validation."""
return load_environment_config()
def get_rag_strategy_config() -> RAGStrategyConfig:
"""Load RAG strategy configuration from environment variables."""

View File

@@ -78,6 +78,11 @@ async def lifespan(app: FastAPI):
logger.info("🚀 Starting Archon backend...")
try:
# Validate configuration FIRST - check for anon vs service key
from .config.config import get_config
get_config() # This will raise ConfigurationError if anon key detected
# Initialize credentials from database FIRST - this is the foundation for everything else
await initialize_credentials()

View File

@@ -227,14 +227,11 @@ class CredentialService:
self._cache[key] = value
# Upsert to database with proper conflict handling
result = (
supabase.table("archon_settings")
.upsert(
data,
on_conflict="key", # Specify the unique column for conflict resolution
)
.execute()
)
# Since we validate service key at startup, permission errors here indicate actual database issues
supabase.table("archon_settings").upsert(
data,
on_conflict="key", # Specify the unique column for conflict resolution
).execute()
# Invalidate RAG settings cache if this is a rag_strategy setting
if category == "rag_strategy":
@@ -256,7 +253,8 @@ class CredentialService:
try:
supabase = self._get_supabase_client()
result = supabase.table("archon_settings").delete().eq("key", key).execute()
# Since we validate service key at startup, we can directly execute
supabase.table("archon_settings").delete().eq("key", key).execute()
# Remove from cache
if key in self._cache:

View File

@@ -56,3 +56,5 @@ def test_existing_credential_returns_normally(client, mock_supabase_client):
assert data["is_encrypted"] is False
# Should not have is_default flag for real credentials
assert "is_default" not in data

View File

@@ -0,0 +1,215 @@
"""
Unit tests for Supabase key validation functionality.
Tests the JWT-based validation of anon vs service keys.
"""
import pytest
from jose import jwt
from unittest.mock import patch, MagicMock
from src.server.config.config import (
validate_supabase_key,
ConfigurationError,
load_environment_config,
)
def test_validate_anon_key():
"""Test validation detects anon key correctly."""
# Create mock anon key JWT
anon_payload = {"role": "anon", "iss": "supabase"}
anon_token = jwt.encode(anon_payload, "secret", algorithm="HS256")
is_valid, msg = validate_supabase_key(anon_token)
assert is_valid == False
assert msg == "ANON_KEY_DETECTED"
def test_validate_service_key():
"""Test validation detects service key correctly."""
# Create mock service key JWT
service_payload = {"role": "service_role", "iss": "supabase"}
service_token = jwt.encode(service_payload, "secret", algorithm="HS256")
is_valid, msg = validate_supabase_key(service_token)
assert is_valid == True
assert msg == "VALID_SERVICE_KEY"
def test_validate_unknown_key():
"""Test validation handles unknown key roles."""
# Create mock key with unknown role
unknown_payload = {"role": "custom", "iss": "supabase"}
unknown_token = jwt.encode(unknown_payload, "secret", algorithm="HS256")
is_valid, msg = validate_supabase_key(unknown_token)
assert is_valid == False
assert "UNKNOWN_KEY_TYPE" in msg
assert "custom" in msg
def test_validate_invalid_jwt():
"""Test validation handles invalid JWT format gracefully."""
is_valid, msg = validate_supabase_key("not-a-jwt")
# Should allow invalid JWT to proceed (might be new format)
assert is_valid == True
assert msg == "UNABLE_TO_VALIDATE"
def test_validate_empty_key():
"""Test validation handles empty key."""
is_valid, msg = validate_supabase_key("")
assert is_valid == False
assert msg == "EMPTY_KEY"
def test_config_raises_on_anon_key():
"""Test that configuration loading raises error when anon key detected."""
# Create a mock anon key JWT
anon_payload = {"role": "anon", "iss": "supabase"}
mock_anon_key = jwt.encode(anon_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_anon_key,
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
with pytest.raises(ConfigurationError) as exc_info:
load_environment_config()
error_message = str(exc_info.value)
assert "CRITICAL: You are using a Supabase ANON key" in error_message
assert "service_role" in error_message
assert "permission denied" in error_message
def test_config_accepts_service_key():
"""Test that configuration loading accepts service key."""
# Create a mock service key JWT
service_payload = {"role": "service_role", "iss": "supabase"}
mock_service_key = jwt.encode(service_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_service_key,
"PORT": "8051", # Required for config
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
# Should not raise an exception
config = load_environment_config()
assert config.supabase_service_key == mock_service_key
def test_config_handles_invalid_jwt():
"""Test that configuration loading handles invalid JWT gracefully."""
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": "invalid-jwt-key",
"PORT": "8051", # Required for config
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
with patch("builtins.print") as mock_print:
# Should not raise an exception for invalid JWT
config = load_environment_config()
assert config.supabase_service_key == "invalid-jwt-key"
def test_config_fails_on_unknown_role():
"""Test that configuration loading fails fast for unknown roles per alpha principles."""
# Create a mock key with unknown role
unknown_payload = {"role": "custom_role", "iss": "supabase"}
mock_unknown_key = jwt.encode(unknown_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_unknown_key,
"PORT": "8051", # Required for config
"OPENAI_API_KEY": "" # Clear any existing key
},
clear=True # Clear all env vars to ensure isolation
):
# Should raise ConfigurationError for unknown role
with pytest.raises(ConfigurationError) as exc_info:
load_environment_config()
error_message = str(exc_info.value)
assert "Unknown Supabase key role 'custom_role'" in error_message
assert "Expected 'service_role'" in error_message
def test_config_raises_on_anon_key_with_port():
"""Test that anon key detection works properly with all required env vars."""
# Create a mock anon key JWT
anon_payload = {"role": "anon", "iss": "supabase"}
mock_anon_key = jwt.encode(anon_payload, "secret", algorithm="HS256")
with patch.dict(
"os.environ",
{
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": mock_anon_key,
"PORT": "8051",
"OPENAI_API_KEY": "sk-test123" # Valid OpenAI key
},
clear=True
):
# Should still raise ConfigurationError for anon key even with valid OpenAI key
with pytest.raises(ConfigurationError) as exc_info:
load_environment_config()
error_message = str(exc_info.value)
assert "CRITICAL: You are using a Supabase ANON key" in error_message
def test_jwt_decoding_with_real_structure():
"""Test JWT decoding with realistic Supabase JWT structure."""
# More realistic Supabase JWT payload structure
realistic_anon_payload = {
"aud": "authenticated",
"exp": 1999999999,
"iat": 1234567890,
"iss": "supabase",
"ref": "abcdefghij",
"role": "anon",
}
realistic_service_payload = {
"aud": "authenticated",
"exp": 1999999999,
"iat": 1234567890,
"iss": "supabase",
"ref": "abcdefghij",
"role": "service_role",
}
anon_token = jwt.encode(realistic_anon_payload, "secret", algorithm="HS256")
service_token = jwt.encode(realistic_service_payload, "secret", algorithm="HS256")
# Test anon key detection
is_valid_anon, msg_anon = validate_supabase_key(anon_token)
assert is_valid_anon == False
assert msg_anon == "ANON_KEY_DETECTED"
# Test service key detection
is_valid_service, msg_service = validate_supabase_key(service_token)
assert is_valid_service == True
assert msg_service == "VALID_SERVICE_KEY"