managarten/packages/shared-python/manacore_auth/auth.py
Till JS 996ec81a0e refactor(shared-python): extract shared auth package from mana-stt and mana-tts
Create packages/shared-python/manacore_auth/ with:
- auth.py: API key validation, rate limiting, local + external auth
- external_auth.py: mana-core-auth remote validation with caching
- create_auth_dependency(scope): factory for per-service auth deps

Migrated services:
- mana-stt: auth.py now wraps shared auth with scope="stt" (272→42 LOC)
- mana-tts: auth.py now wraps shared auth with scope="tts" (272→42 LOC)

The only difference between services was the scope parameter ("stt" vs "tts").
Both external_auth.py files were 100% identical and are now thin re-exports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 14:09:32 +02:00

253 lines
8.3 KiB
Python

"""
API Key Authentication for ManaCore Python Services
Supports two authentication modes:
1. Local API keys: Configured via environment variables
2. External API keys: Validated via mana-core-auth service (when EXTERNAL_AUTH_ENABLED=true)
Usage:
API_KEYS=sk-key1:name1,sk-key2:name2
INTERNAL_API_KEY=sk-internal-xxx
EXTERNAL_AUTH_ENABLED=true
MANA_CORE_AUTH_URL=http://localhost:3001
"""
import os
import time
import logging
from typing import Optional
from collections import defaultdict
from dataclasses import dataclass, field
from functools import partial
from fastapi import HTTPException, Security, Request, Depends
from fastapi.security import APIKeyHeader
from .external_auth import (
is_external_auth_enabled,
validate_api_key_external,
)
logger = logging.getLogger(__name__)
# Configuration
API_KEYS_ENV = os.getenv("API_KEYS", "")
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "")
REQUIRE_AUTH = os.getenv("REQUIRE_AUTH", "true").lower() == "true"
RATE_LIMIT_REQUESTS = int(os.getenv("RATE_LIMIT_REQUESTS", "60"))
RATE_LIMIT_WINDOW = int(os.getenv("RATE_LIMIT_WINDOW", "60"))
@dataclass
class APIKey:
"""API Key with metadata."""
key: str
name: str
is_internal: bool = False
rate_limit: int = RATE_LIMIT_REQUESTS
@dataclass
class RateLimitInfo:
"""Rate limit tracking per key."""
requests: list = field(default_factory=list)
def is_allowed(self, limit: int, window: int) -> bool:
now = time.time()
self.requests = [t for t in self.requests if now - t < window]
if len(self.requests) >= limit:
return False
self.requests.append(now)
return True
def remaining(self, limit: int, window: int) -> int:
now = time.time()
self.requests = [t for t in self.requests if now - t < window]
return max(0, limit - len(self.requests))
def _parse_api_keys() -> dict[str, APIKey]:
"""Parse API keys from environment variables."""
keys = {}
if API_KEYS_ENV:
for entry in API_KEYS_ENV.split(","):
entry = entry.strip()
if ":" in entry:
key, name = entry.split(":", 1)
else:
key, name = entry, "default"
keys[key.strip()] = APIKey(key=key.strip(), name=name.strip())
if INTERNAL_API_KEY:
keys[INTERNAL_API_KEY] = APIKey(
key=INTERNAL_API_KEY, name="internal", is_internal=True, rate_limit=999999,
)
return keys
# Global state
_api_keys = _parse_api_keys()
_rate_limits: dict[str, RateLimitInfo] = defaultdict(RateLimitInfo)
# Security scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@dataclass
class AuthResult:
"""Result of authentication check."""
authenticated: bool
key_name: Optional[str] = None
is_internal: bool = False
rate_limit_remaining: Optional[int] = None
user_id: Optional[str] = None
async def verify_api_key(
request: Request,
scope: str = "default",
api_key: Optional[str] = Security(api_key_header),
) -> AuthResult:
"""
Verify API key and check rate limits.
Args:
request: The incoming HTTP request.
scope: The service scope for external auth (e.g., "stt", "tts").
api_key: The API key from X-API-Key header.
Returns AuthResult with authentication status.
Raises HTTPException if auth fails or rate limited.
"""
path = request.url.path
if path in ["/health", "/docs", "/openapi.json", "/redoc"]:
return AuthResult(authenticated=True, key_name="public")
if not REQUIRE_AUTH:
return AuthResult(authenticated=True, key_name="anonymous")
if not api_key:
logger.warning(f"Missing API key for {path} from {request.client.host if request.client else 'unknown'}")
raise HTTPException(
status_code=401,
detail="Missing API key. Provide X-API-Key header.",
headers={"WWW-Authenticate": "ApiKey"},
)
# Try external auth first for sk_live_ keys
if api_key.startswith("sk_live_") and is_external_auth_enabled():
external_result = await validate_api_key_external(api_key, scope)
if external_result is not None:
if external_result.valid:
rate_info = _rate_limits[api_key]
limit = external_result.rate_limit_requests
window = external_result.rate_limit_window
if not rate_info.is_allowed(limit, window):
remaining = rate_info.remaining(limit, window)
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {window} seconds.",
headers={
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + window),
"Retry-After": str(window),
},
)
remaining = rate_info.remaining(limit, window)
return AuthResult(
authenticated=True, key_name="external", is_internal=False,
rate_limit_remaining=remaining, user_id=external_result.user_id,
)
else:
raise HTTPException(
status_code=401,
detail=external_result.error or "Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
# Local auth
if api_key not in _api_keys:
logger.warning(f"Invalid API key attempt for {path}")
raise HTTPException(
status_code=401, detail="Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
key_info = _api_keys[api_key]
if not key_info.is_internal:
rate_info = _rate_limits[api_key]
if not rate_info.is_allowed(key_info.rate_limit, RATE_LIMIT_WINDOW):
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {RATE_LIMIT_WINDOW} seconds.",
headers={
"X-RateLimit-Limit": str(key_info.rate_limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + RATE_LIMIT_WINDOW),
"Retry-After": str(RATE_LIMIT_WINDOW),
},
)
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
else:
remaining = None
return AuthResult(
authenticated=True, key_name=key_info.name,
is_internal=key_info.is_internal, rate_limit_remaining=remaining,
)
def create_auth_dependency(scope: str):
"""
Create a FastAPI dependency for API key auth with a specific scope.
Usage:
auth_dep = create_auth_dependency("stt")
@app.post("/transcribe")
async def transcribe(auth: AuthResult = Depends(auth_dep)):
...
"""
async def _dep(
request: Request,
api_key: Optional[str] = Security(api_key_header),
) -> AuthResult:
return await verify_api_key(request, scope=scope, api_key=api_key)
return _dep
def get_api_key_stats() -> dict:
"""Get statistics about API keys (for admin endpoint)."""
stats = {
"total_keys": len(_api_keys),
"auth_required": REQUIRE_AUTH,
"rate_limit": {
"requests_per_window": RATE_LIMIT_REQUESTS,
"window_seconds": RATE_LIMIT_WINDOW,
},
"keys": [],
}
for key, info in _api_keys.items():
masked_key = key[:8] + "..." if len(key) > 8 else "***"
rate_info = _rate_limits.get(key, RateLimitInfo())
stats["keys"].append({
"name": info.name,
"key_prefix": masked_key,
"is_internal": info.is_internal,
"requests_in_window": len(rate_info.requests),
"remaining": rate_info.remaining(info.rate_limit, RATE_LIMIT_WINDOW),
})
return stats
def reload_api_keys():
"""Reload API keys from environment (for runtime updates)."""
global _api_keys
_api_keys = _parse_api_keys()
logger.info(f"Reloaded {len(_api_keys)} API keys")