refactor(shared-python): extract shared auth package from mana-stt and mana-tts

Create packages/shared-python/manacore_auth/ with:
- auth.py: API key validation, rate limiting, local + external auth
- external_auth.py: mana-core-auth remote validation with caching
- create_auth_dependency(scope): factory for per-service auth deps

Migrated services:
- mana-stt: auth.py now wraps shared auth with scope="stt" (272→42 LOC)
- mana-tts: auth.py now wraps shared auth with scope="tts" (272→42 LOC)

The only difference between services was the scope parameter ("stt" vs "tts").
Both external_auth.py files were 100% identical and are now thin re-exports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Till JS 2026-04-02 14:09:32 +02:00
parent e11aa50106
commit 996ec81a0e
7 changed files with 518 additions and 784 deletions

View file

@ -0,0 +1,47 @@
"""
ManaCore Shared Auth API Key authentication for Python microservices.
Supports two authentication modes:
1. Local API keys: Configured via environment variables
2. External API keys: Validated via mana-core-auth service (when EXTERNAL_AUTH_ENABLED=true)
Usage:
from manacore_auth import verify_api_key, AuthResult, get_api_key_stats
# In FastAPI:
@app.post("/transcribe")
async def transcribe(auth: AuthResult = Depends(create_auth_dependency("stt"))):
...
"""
from .auth import (
APIKey,
AuthResult,
RateLimitInfo,
verify_api_key,
get_api_key_stats,
reload_api_keys,
api_key_header,
create_auth_dependency,
)
from .external_auth import (
ExternalValidationResult,
is_external_auth_enabled,
validate_api_key_external,
clear_cache,
)
__all__ = [
"APIKey",
"AuthResult",
"RateLimitInfo",
"verify_api_key",
"get_api_key_stats",
"reload_api_keys",
"api_key_header",
"create_auth_dependency",
"ExternalValidationResult",
"is_external_auth_enabled",
"validate_api_key_external",
"clear_cache",
]

View file

@ -0,0 +1,253 @@
"""
API Key Authentication for ManaCore Python Services
Supports two authentication modes:
1. Local API keys: Configured via environment variables
2. External API keys: Validated via mana-core-auth service (when EXTERNAL_AUTH_ENABLED=true)
Usage:
API_KEYS=sk-key1:name1,sk-key2:name2
INTERNAL_API_KEY=sk-internal-xxx
EXTERNAL_AUTH_ENABLED=true
MANA_CORE_AUTH_URL=http://localhost:3001
"""
import os
import time
import logging
from typing import Optional
from collections import defaultdict
from dataclasses import dataclass, field
from functools import partial
from fastapi import HTTPException, Security, Request, Depends
from fastapi.security import APIKeyHeader
from .external_auth import (
is_external_auth_enabled,
validate_api_key_external,
)
logger = logging.getLogger(__name__)
# Configuration
API_KEYS_ENV = os.getenv("API_KEYS", "")
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "")
REQUIRE_AUTH = os.getenv("REQUIRE_AUTH", "true").lower() == "true"
RATE_LIMIT_REQUESTS = int(os.getenv("RATE_LIMIT_REQUESTS", "60"))
RATE_LIMIT_WINDOW = int(os.getenv("RATE_LIMIT_WINDOW", "60"))
@dataclass
class APIKey:
"""API Key with metadata."""
key: str
name: str
is_internal: bool = False
rate_limit: int = RATE_LIMIT_REQUESTS
@dataclass
class RateLimitInfo:
"""Rate limit tracking per key."""
requests: list = field(default_factory=list)
def is_allowed(self, limit: int, window: int) -> bool:
now = time.time()
self.requests = [t for t in self.requests if now - t < window]
if len(self.requests) >= limit:
return False
self.requests.append(now)
return True
def remaining(self, limit: int, window: int) -> int:
now = time.time()
self.requests = [t for t in self.requests if now - t < window]
return max(0, limit - len(self.requests))
def _parse_api_keys() -> dict[str, APIKey]:
"""Parse API keys from environment variables."""
keys = {}
if API_KEYS_ENV:
for entry in API_KEYS_ENV.split(","):
entry = entry.strip()
if ":" in entry:
key, name = entry.split(":", 1)
else:
key, name = entry, "default"
keys[key.strip()] = APIKey(key=key.strip(), name=name.strip())
if INTERNAL_API_KEY:
keys[INTERNAL_API_KEY] = APIKey(
key=INTERNAL_API_KEY, name="internal", is_internal=True, rate_limit=999999,
)
return keys
# Global state
_api_keys = _parse_api_keys()
_rate_limits: dict[str, RateLimitInfo] = defaultdict(RateLimitInfo)
# Security scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@dataclass
class AuthResult:
"""Result of authentication check."""
authenticated: bool
key_name: Optional[str] = None
is_internal: bool = False
rate_limit_remaining: Optional[int] = None
user_id: Optional[str] = None
async def verify_api_key(
request: Request,
scope: str = "default",
api_key: Optional[str] = Security(api_key_header),
) -> AuthResult:
"""
Verify API key and check rate limits.
Args:
request: The incoming HTTP request.
scope: The service scope for external auth (e.g., "stt", "tts").
api_key: The API key from X-API-Key header.
Returns AuthResult with authentication status.
Raises HTTPException if auth fails or rate limited.
"""
path = request.url.path
if path in ["/health", "/docs", "/openapi.json", "/redoc"]:
return AuthResult(authenticated=True, key_name="public")
if not REQUIRE_AUTH:
return AuthResult(authenticated=True, key_name="anonymous")
if not api_key:
logger.warning(f"Missing API key for {path} from {request.client.host if request.client else 'unknown'}")
raise HTTPException(
status_code=401,
detail="Missing API key. Provide X-API-Key header.",
headers={"WWW-Authenticate": "ApiKey"},
)
# Try external auth first for sk_live_ keys
if api_key.startswith("sk_live_") and is_external_auth_enabled():
external_result = await validate_api_key_external(api_key, scope)
if external_result is not None:
if external_result.valid:
rate_info = _rate_limits[api_key]
limit = external_result.rate_limit_requests
window = external_result.rate_limit_window
if not rate_info.is_allowed(limit, window):
remaining = rate_info.remaining(limit, window)
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {window} seconds.",
headers={
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + window),
"Retry-After": str(window),
},
)
remaining = rate_info.remaining(limit, window)
return AuthResult(
authenticated=True, key_name="external", is_internal=False,
rate_limit_remaining=remaining, user_id=external_result.user_id,
)
else:
raise HTTPException(
status_code=401,
detail=external_result.error or "Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
# Local auth
if api_key not in _api_keys:
logger.warning(f"Invalid API key attempt for {path}")
raise HTTPException(
status_code=401, detail="Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
key_info = _api_keys[api_key]
if not key_info.is_internal:
rate_info = _rate_limits[api_key]
if not rate_info.is_allowed(key_info.rate_limit, RATE_LIMIT_WINDOW):
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {RATE_LIMIT_WINDOW} seconds.",
headers={
"X-RateLimit-Limit": str(key_info.rate_limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + RATE_LIMIT_WINDOW),
"Retry-After": str(RATE_LIMIT_WINDOW),
},
)
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
else:
remaining = None
return AuthResult(
authenticated=True, key_name=key_info.name,
is_internal=key_info.is_internal, rate_limit_remaining=remaining,
)
def create_auth_dependency(scope: str):
"""
Create a FastAPI dependency for API key auth with a specific scope.
Usage:
auth_dep = create_auth_dependency("stt")
@app.post("/transcribe")
async def transcribe(auth: AuthResult = Depends(auth_dep)):
...
"""
async def _dep(
request: Request,
api_key: Optional[str] = Security(api_key_header),
) -> AuthResult:
return await verify_api_key(request, scope=scope, api_key=api_key)
return _dep
def get_api_key_stats() -> dict:
"""Get statistics about API keys (for admin endpoint)."""
stats = {
"total_keys": len(_api_keys),
"auth_required": REQUIRE_AUTH,
"rate_limit": {
"requests_per_window": RATE_LIMIT_REQUESTS,
"window_seconds": RATE_LIMIT_WINDOW,
},
"keys": [],
}
for key, info in _api_keys.items():
masked_key = key[:8] + "..." if len(key) > 8 else "***"
rate_info = _rate_limits.get(key, RateLimitInfo())
stats["keys"].append({
"name": info.name,
"key_prefix": masked_key,
"is_internal": info.is_internal,
"requests_in_window": len(rate_info.requests),
"remaining": rate_info.remaining(info.rate_limit, RATE_LIMIT_WINDOW),
})
return stats
def reload_api_keys():
"""Reload API keys from environment (for runtime updates)."""
global _api_keys
_api_keys = _parse_api_keys()
logger.info(f"Reloaded {len(_api_keys)} API keys")

View file

@ -0,0 +1,142 @@
"""
External API Key Validation via mana-core-auth
When EXTERNAL_AUTH_ENABLED=true, API keys are validated against the
central mana-core-auth service. This allows users to create and manage
API keys from the mana.how web interface.
Results are cached for 5 minutes to reduce load on the auth service.
"""
import os
import time
import logging
import httpx
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Configuration
EXTERNAL_AUTH_ENABLED = os.getenv("EXTERNAL_AUTH_ENABLED", "false").lower() == "true"
MANA_CORE_AUTH_URL = os.getenv("MANA_CORE_AUTH_URL", "http://localhost:3001")
API_KEY_CACHE_TTL = int(os.getenv("API_KEY_CACHE_TTL", "300")) # 5 minutes
EXTERNAL_AUTH_TIMEOUT = float(os.getenv("EXTERNAL_AUTH_TIMEOUT", "5.0")) # seconds
@dataclass
class ExternalValidationResult:
"""Result from external API key validation."""
valid: bool
user_id: Optional[str] = None
scopes: Optional[list] = None
rate_limit_requests: int = 60
rate_limit_window: int = 60
error: Optional[str] = None
cached_at: float = 0.0
# In-memory cache for validation results
_validation_cache: dict[str, ExternalValidationResult] = {}
def is_external_auth_enabled() -> bool:
"""Check if external authentication is enabled."""
return EXTERNAL_AUTH_ENABLED
def _get_cached_result(api_key: str) -> Optional[ExternalValidationResult]:
"""Get cached validation result if still valid."""
result = _validation_cache.get(api_key)
if result and (time.time() - result.cached_at) < API_KEY_CACHE_TTL:
return result
return None
def _cache_result(api_key: str, result: ExternalValidationResult):
"""Cache a validation result."""
result.cached_at = time.time()
_validation_cache[api_key] = result
# Clean up old entries periodically
if len(_validation_cache) > 1000:
now = time.time()
expired_keys = [
k for k, v in _validation_cache.items()
if (now - v.cached_at) >= API_KEY_CACHE_TTL
]
for k in expired_keys:
del _validation_cache[k]
async def validate_api_key_external(api_key: str, scope: str) -> Optional[ExternalValidationResult]:
"""
Validate an API key against mana-core-auth service.
Args:
api_key: The API key to validate (e.g., "sk_live_...")
scope: The required scope (e.g., "stt", "tts", "image-gen")
Returns:
ExternalValidationResult if external auth is enabled and the key was validated.
None if external auth is disabled or the service is unavailable (fallback to local).
"""
if not EXTERNAL_AUTH_ENABLED:
return None
# Check cache first
cached = _get_cached_result(api_key)
if cached:
logger.debug(f"Using cached validation result for key prefix: {api_key[:12]}...")
if cached.valid and cached.scopes and scope not in cached.scopes:
return ExternalValidationResult(
valid=False,
error=f"API key does not have scope: {scope}",
)
return cached
# Call mana-core-auth validation endpoint
try:
async with httpx.AsyncClient(timeout=EXTERNAL_AUTH_TIMEOUT) as client:
response = await client.post(
f"{MANA_CORE_AUTH_URL}/api/v1/api-keys/validate",
json={"apiKey": api_key, "scope": scope},
)
if response.status_code == 200:
data = response.json()
result = ExternalValidationResult(
valid=data.get("valid", False),
user_id=data.get("userId"),
scopes=data.get("scopes", []),
rate_limit_requests=data.get("rateLimit", {}).get("requests", 60),
rate_limit_window=data.get("rateLimit", {}).get("window", 60),
error=data.get("error"),
)
_cache_result(api_key, result)
return result
else:
logger.warning(
f"External auth returned status {response.status_code}: {response.text}"
)
return ExternalValidationResult(
valid=False,
error=f"Auth service returned {response.status_code}",
)
except httpx.TimeoutException:
logger.warning("External auth service timeout - falling back to local auth")
return None
except httpx.ConnectError:
logger.warning("Cannot connect to external auth service - falling back to local auth")
return None
except Exception as e:
logger.error(f"External auth error: {e}")
return None
def clear_cache():
"""Clear the validation cache (for testing or runtime updates)."""
global _validation_cache
_validation_cache.clear()
logger.info("External auth cache cleared")

View file

@ -1,271 +1,41 @@
"""
API Key Authentication for ManaCore STT Service
Supports two authentication modes:
1. Local API keys: Configured via environment variables
2. External API keys: Validated via mana-core-auth service (when EXTERNAL_AUTH_ENABLED=true)
Usage:
# Local keys
API_KEYS=sk-key1:name1,sk-key2:name2
INTERNAL_API_KEY=sk-internal-xxx
# External auth (for user-created keys via mana.how)
EXTERNAL_AUTH_ENABLED=true
MANA_CORE_AUTH_URL=http://localhost:3001
API Key Authentication for ManaCore STT Service.
Delegates to shared manacore_auth package.
"""
# Re-export everything from shared auth for backward compatibility
import sys
import os
import time
import logging
from typing import Optional
from collections import defaultdict
from dataclasses import dataclass, field
from fastapi import HTTPException, Security, Request
from fastapi.security import APIKeyHeader
# Add shared-python to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "packages", "shared-python"))
from .external_auth import (
from manacore_auth import (
APIKey,
AuthResult,
RateLimitInfo,
verify_api_key as _verify_api_key,
get_api_key_stats,
reload_api_keys,
api_key_header,
create_auth_dependency,
)
from manacore_auth.external_auth import (
ExternalValidationResult,
is_external_auth_enabled,
validate_api_key_external,
ExternalValidationResult,
)
logger = logging.getLogger(__name__)
from typing import Optional
from fastapi import Security, Request
# Configuration
API_KEYS_ENV = os.getenv("API_KEYS", "") # Format: "sk-key1:name1,sk-key2:name2"
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "") # Unlimited internal key
REQUIRE_AUTH = os.getenv("REQUIRE_AUTH", "true").lower() == "true"
RATE_LIMIT_REQUESTS = int(os.getenv("RATE_LIMIT_REQUESTS", "60")) # Per minute
RATE_LIMIT_WINDOW = int(os.getenv("RATE_LIMIT_WINDOW", "60")) # Seconds
@dataclass
class APIKey:
"""API Key with metadata."""
key: str
name: str
is_internal: bool = False
rate_limit: int = RATE_LIMIT_REQUESTS # Requests per window
@dataclass
class RateLimitInfo:
"""Rate limit tracking per key."""
requests: list = field(default_factory=list)
def is_allowed(self, limit: int, window: int) -> bool:
"""Check if request is allowed within rate limit."""
now = time.time()
# Remove old requests outside window
self.requests = [t for t in self.requests if now - t < window]
if len(self.requests) >= limit:
return False
self.requests.append(now)
return True
def remaining(self, limit: int, window: int) -> int:
"""Get remaining requests in current window."""
now = time.time()
self.requests = [t for t in self.requests if now - t < window]
return max(0, limit - len(self.requests))
# Parse API keys from environment
def _parse_api_keys() -> dict[str, APIKey]:
"""Parse API keys from environment variables."""
keys = {}
# Parse comma-separated keys
if API_KEYS_ENV:
for entry in API_KEYS_ENV.split(","):
entry = entry.strip()
if ":" in entry:
key, name = entry.split(":", 1)
else:
key, name = entry, "default"
keys[key.strip()] = APIKey(key=key.strip(), name=name.strip())
# Add internal key with no rate limit
if INTERNAL_API_KEY:
keys[INTERNAL_API_KEY] = APIKey(
key=INTERNAL_API_KEY,
name="internal",
is_internal=True,
rate_limit=999999, # Effectively unlimited
)
return keys
# Global state
_api_keys = _parse_api_keys()
_rate_limits: dict[str, RateLimitInfo] = defaultdict(RateLimitInfo)
# Security scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@dataclass
class AuthResult:
"""Result of authentication check."""
authenticated: bool
key_name: Optional[str] = None
is_internal: bool = False
rate_limit_remaining: Optional[int] = None
user_id: Optional[str] = None # Set when using external auth
# STT-specific auth dependency
verify_stt_key = create_auth_dependency("stt")
async def verify_api_key(
request: Request,
api_key: Optional[str] = Security(api_key_header),
) -> AuthResult:
"""
Verify API key and check rate limits.
Supports two authentication modes:
1. External auth via mana-core-auth (for sk_live_ keys)
2. Local auth via environment variables
Returns AuthResult with authentication status.
Raises HTTPException if auth fails or rate limited.
"""
# Skip auth for health and docs endpoints
path = request.url.path
if path in ["/health", "/docs", "/openapi.json", "/redoc"]:
return AuthResult(authenticated=True, key_name="public")
# If auth not required, allow all
if not REQUIRE_AUTH:
return AuthResult(authenticated=True, key_name="anonymous")
# Check for API key
if not api_key:
logger.warning(f"Missing API key for {path} from {request.client.host if request.client else 'unknown'}")
raise HTTPException(
status_code=401,
detail="Missing API key. Provide X-API-Key header.",
headers={"WWW-Authenticate": "ApiKey"},
)
# Try external auth first for sk_live_ keys (user-created keys via mana.how)
if api_key.startswith("sk_live_") and is_external_auth_enabled():
external_result = await validate_api_key_external(api_key, "stt")
if external_result is not None:
if external_result.valid:
# Use rate limits from external auth
rate_info = _rate_limits[api_key]
limit = external_result.rate_limit_requests
window = external_result.rate_limit_window
if not rate_info.is_allowed(limit, window):
remaining = rate_info.remaining(limit, window)
logger.warning(f"Rate limit exceeded for external key")
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {window} seconds.",
headers={
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + window),
"Retry-After": str(window),
},
)
remaining = rate_info.remaining(limit, window)
logger.debug(f"Authenticated external request from user {external_result.user_id} to {path}")
return AuthResult(
authenticated=True,
key_name="external",
is_internal=False,
rate_limit_remaining=remaining,
user_id=external_result.user_id,
)
else:
# External auth returned invalid
logger.warning(f"External auth failed: {external_result.error}")
raise HTTPException(
status_code=401,
detail=external_result.error or "Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
# If external_result is None, fall through to local auth
# Local auth: Validate key against environment variables
if api_key not in _api_keys:
logger.warning(f"Invalid API key attempt for {path}")
raise HTTPException(
status_code=401,
detail="Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
key_info = _api_keys[api_key]
# Check rate limit (skip for internal keys)
if not key_info.is_internal:
rate_info = _rate_limits[api_key]
if not rate_info.is_allowed(key_info.rate_limit, RATE_LIMIT_WINDOW):
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
logger.warning(f"Rate limit exceeded for key '{key_info.name}'")
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {RATE_LIMIT_WINDOW} seconds.",
headers={
"X-RateLimit-Limit": str(key_info.rate_limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + RATE_LIMIT_WINDOW),
"Retry-After": str(RATE_LIMIT_WINDOW),
},
)
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
else:
remaining = None
logger.debug(f"Authenticated request from '{key_info.name}' to {path}")
return AuthResult(
authenticated=True,
key_name=key_info.name,
is_internal=key_info.is_internal,
rate_limit_remaining=remaining,
)
def get_api_key_stats() -> dict:
"""Get statistics about API keys (for admin endpoint)."""
stats = {
"total_keys": len(_api_keys),
"auth_required": REQUIRE_AUTH,
"rate_limit": {
"requests_per_window": RATE_LIMIT_REQUESTS,
"window_seconds": RATE_LIMIT_WINDOW,
},
"keys": [],
}
for key, info in _api_keys.items():
# Don't expose actual keys, just metadata
masked_key = key[:8] + "..." if len(key) > 8 else "***"
rate_info = _rate_limits.get(key, RateLimitInfo())
stats["keys"].append({
"name": info.name,
"key_prefix": masked_key,
"is_internal": info.is_internal,
"requests_in_window": len(rate_info.requests),
"remaining": rate_info.remaining(info.rate_limit, RATE_LIMIT_WINDOW),
})
return stats
def reload_api_keys():
"""Reload API keys from environment (for runtime updates)."""
global _api_keys
_api_keys = _parse_api_keys()
logger.info(f"Reloaded {len(_api_keys)} API keys")
"""Verify API key with STT scope."""
return await _verify_api_key(request, scope="stt", api_key=api_key)

View file

@ -1,145 +1,22 @@
"""
External API Key Validation via mana-core-auth
When EXTERNAL_AUTH_ENABLED=true, API keys are validated against the
central mana-core-auth service. This allows users to create and manage
API keys from the mana.how web interface.
Results are cached for 5 minutes to reduce load on the auth service.
External API Key Validation delegates to shared manacore_auth package.
"""
import sys
import os
import time
import logging
import httpx
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "packages", "shared-python"))
# Configuration
EXTERNAL_AUTH_ENABLED = os.getenv("EXTERNAL_AUTH_ENABLED", "false").lower() == "true"
MANA_CORE_AUTH_URL = os.getenv("MANA_CORE_AUTH_URL", "http://localhost:3001")
API_KEY_CACHE_TTL = int(os.getenv("API_KEY_CACHE_TTL", "300")) # 5 minutes
EXTERNAL_AUTH_TIMEOUT = float(os.getenv("EXTERNAL_AUTH_TIMEOUT", "5.0")) # seconds
from manacore_auth.external_auth import (
ExternalValidationResult,
is_external_auth_enabled,
validate_api_key_external,
clear_cache,
)
@dataclass
class ExternalValidationResult:
"""Result from external API key validation."""
valid: bool
user_id: Optional[str] = None
scopes: Optional[list] = None
rate_limit_requests: int = 60
rate_limit_window: int = 60
error: Optional[str] = None
cached_at: float = 0.0
# In-memory cache for validation results
# Key: API key, Value: ExternalValidationResult
_validation_cache: dict[str, ExternalValidationResult] = {}
def is_external_auth_enabled() -> bool:
"""Check if external authentication is enabled."""
return EXTERNAL_AUTH_ENABLED
def _get_cached_result(api_key: str) -> Optional[ExternalValidationResult]:
"""Get cached validation result if still valid."""
result = _validation_cache.get(api_key)
if result and (time.time() - result.cached_at) < API_KEY_CACHE_TTL:
return result
return None
def _cache_result(api_key: str, result: ExternalValidationResult):
"""Cache a validation result."""
result.cached_at = time.time()
_validation_cache[api_key] = result
# Clean up old entries periodically (keep cache size manageable)
if len(_validation_cache) > 1000:
now = time.time()
expired_keys = [
k for k, v in _validation_cache.items()
if (now - v.cached_at) >= API_KEY_CACHE_TTL
]
for k in expired_keys:
del _validation_cache[k]
async def validate_api_key_external(api_key: str, scope: str) -> Optional[ExternalValidationResult]:
"""
Validate an API key against mana-core-auth service.
Args:
api_key: The API key to validate (e.g., "sk_live_...")
scope: The required scope (e.g., "stt" or "tts")
Returns:
ExternalValidationResult if external auth is enabled and the key was validated.
None if external auth is disabled or the service is unavailable (fallback to local).
"""
if not EXTERNAL_AUTH_ENABLED:
return None
# Check cache first
cached = _get_cached_result(api_key)
if cached:
logger.debug(f"Using cached validation result for key prefix: {api_key[:12]}...")
# Check scope against cached result
if cached.valid and cached.scopes and scope not in cached.scopes:
return ExternalValidationResult(
valid=False,
error=f"API key does not have scope: {scope}",
)
return cached
# Call mana-core-auth validation endpoint
try:
async with httpx.AsyncClient(timeout=EXTERNAL_AUTH_TIMEOUT) as client:
response = await client.post(
f"{MANA_CORE_AUTH_URL}/api/v1/api-keys/validate",
json={"apiKey": api_key, "scope": scope},
)
if response.status_code == 200:
data = response.json()
result = ExternalValidationResult(
valid=data.get("valid", False),
user_id=data.get("userId"),
scopes=data.get("scopes", []),
rate_limit_requests=data.get("rateLimit", {}).get("requests", 60),
rate_limit_window=data.get("rateLimit", {}).get("window", 60),
error=data.get("error"),
)
_cache_result(api_key, result)
return result
else:
logger.warning(
f"External auth returned status {response.status_code}: {response.text}"
)
# Don't cache errors - allow retry
return ExternalValidationResult(
valid=False,
error=f"Auth service returned {response.status_code}",
)
except httpx.TimeoutException:
logger.warning("External auth service timeout - falling back to local auth")
return None
except httpx.ConnectError:
logger.warning("Cannot connect to external auth service - falling back to local auth")
return None
except Exception as e:
logger.error(f"External auth error: {e}")
return None
def clear_cache():
"""Clear the validation cache (for testing or runtime updates)."""
global _validation_cache
_validation_cache.clear()
logger.info("External auth cache cleared")
__all__ = [
"ExternalValidationResult",
"is_external_auth_enabled",
"validate_api_key_external",
"clear_cache",
]

View file

@ -1,271 +1,39 @@
"""
API Key Authentication for ManaCore TTS Service
Supports two authentication modes:
1. Local API keys: Configured via environment variables
2. External API keys: Validated via mana-core-auth service (when EXTERNAL_AUTH_ENABLED=true)
Usage:
# Local keys
API_KEYS=sk-key1:name1,sk-key2:name2
INTERNAL_API_KEY=sk-internal-xxx
# External auth (for user-created keys via mana.how)
EXTERNAL_AUTH_ENABLED=true
MANA_CORE_AUTH_URL=http://localhost:3001
API Key Authentication for ManaCore TTS Service.
Delegates to shared manacore_auth package.
"""
import sys
import os
import time
import logging
from typing import Optional
from collections import defaultdict
from dataclasses import dataclass, field
from fastapi import HTTPException, Security, Request
from fastapi.security import APIKeyHeader
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "packages", "shared-python"))
from .external_auth import (
from manacore_auth import (
APIKey,
AuthResult,
RateLimitInfo,
verify_api_key as _verify_api_key,
get_api_key_stats,
reload_api_keys,
api_key_header,
create_auth_dependency,
)
from manacore_auth.external_auth import (
ExternalValidationResult,
is_external_auth_enabled,
validate_api_key_external,
ExternalValidationResult,
)
logger = logging.getLogger(__name__)
from typing import Optional
from fastapi import Security, Request
# Configuration
API_KEYS_ENV = os.getenv("API_KEYS", "") # Format: "sk-key1:name1,sk-key2:name2"
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "") # Unlimited internal key
REQUIRE_AUTH = os.getenv("REQUIRE_AUTH", "true").lower() == "true"
RATE_LIMIT_REQUESTS = int(os.getenv("RATE_LIMIT_REQUESTS", "60")) # Per minute
RATE_LIMIT_WINDOW = int(os.getenv("RATE_LIMIT_WINDOW", "60")) # Seconds
@dataclass
class APIKey:
"""API Key with metadata."""
key: str
name: str
is_internal: bool = False
rate_limit: int = RATE_LIMIT_REQUESTS # Requests per window
@dataclass
class RateLimitInfo:
"""Rate limit tracking per key."""
requests: list = field(default_factory=list)
def is_allowed(self, limit: int, window: int) -> bool:
"""Check if request is allowed within rate limit."""
now = time.time()
# Remove old requests outside window
self.requests = [t for t in self.requests if now - t < window]
if len(self.requests) >= limit:
return False
self.requests.append(now)
return True
def remaining(self, limit: int, window: int) -> int:
"""Get remaining requests in current window."""
now = time.time()
self.requests = [t for t in self.requests if now - t < window]
return max(0, limit - len(self.requests))
# Parse API keys from environment
def _parse_api_keys() -> dict[str, APIKey]:
"""Parse API keys from environment variables."""
keys = {}
# Parse comma-separated keys
if API_KEYS_ENV:
for entry in API_KEYS_ENV.split(","):
entry = entry.strip()
if ":" in entry:
key, name = entry.split(":", 1)
else:
key, name = entry, "default"
keys[key.strip()] = APIKey(key=key.strip(), name=name.strip())
# Add internal key with no rate limit
if INTERNAL_API_KEY:
keys[INTERNAL_API_KEY] = APIKey(
key=INTERNAL_API_KEY,
name="internal",
is_internal=True,
rate_limit=999999, # Effectively unlimited
)
return keys
# Global state
_api_keys = _parse_api_keys()
_rate_limits: dict[str, RateLimitInfo] = defaultdict(RateLimitInfo)
# Security scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@dataclass
class AuthResult:
"""Result of authentication check."""
authenticated: bool
key_name: Optional[str] = None
is_internal: bool = False
rate_limit_remaining: Optional[int] = None
user_id: Optional[str] = None # Set when using external auth
# TTS-specific auth dependency
verify_tts_key = create_auth_dependency("tts")
async def verify_api_key(
request: Request,
api_key: Optional[str] = Security(api_key_header),
) -> AuthResult:
"""
Verify API key and check rate limits.
Supports two authentication modes:
1. External auth via mana-core-auth (for sk_live_ keys)
2. Local auth via environment variables
Returns AuthResult with authentication status.
Raises HTTPException if auth fails or rate limited.
"""
# Skip auth for health and docs endpoints
path = request.url.path
if path in ["/health", "/docs", "/openapi.json", "/redoc"]:
return AuthResult(authenticated=True, key_name="public")
# If auth not required, allow all
if not REQUIRE_AUTH:
return AuthResult(authenticated=True, key_name="anonymous")
# Check for API key
if not api_key:
logger.warning(f"Missing API key for {path} from {request.client.host if request.client else 'unknown'}")
raise HTTPException(
status_code=401,
detail="Missing API key. Provide X-API-Key header.",
headers={"WWW-Authenticate": "ApiKey"},
)
# Try external auth first for sk_live_ keys (user-created keys via mana.how)
if api_key.startswith("sk_live_") and is_external_auth_enabled():
external_result = await validate_api_key_external(api_key, "tts")
if external_result is not None:
if external_result.valid:
# Use rate limits from external auth
rate_info = _rate_limits[api_key]
limit = external_result.rate_limit_requests
window = external_result.rate_limit_window
if not rate_info.is_allowed(limit, window):
remaining = rate_info.remaining(limit, window)
logger.warning(f"Rate limit exceeded for external key")
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {window} seconds.",
headers={
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + window),
"Retry-After": str(window),
},
)
remaining = rate_info.remaining(limit, window)
logger.debug(f"Authenticated external request from user {external_result.user_id} to {path}")
return AuthResult(
authenticated=True,
key_name="external",
is_internal=False,
rate_limit_remaining=remaining,
user_id=external_result.user_id,
)
else:
# External auth returned invalid
logger.warning(f"External auth failed: {external_result.error}")
raise HTTPException(
status_code=401,
detail=external_result.error or "Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
# If external_result is None, fall through to local auth
# Local auth: Validate key against environment variables
if api_key not in _api_keys:
logger.warning(f"Invalid API key attempt for {path}")
raise HTTPException(
status_code=401,
detail="Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"},
)
key_info = _api_keys[api_key]
# Check rate limit (skip for internal keys)
if not key_info.is_internal:
rate_info = _rate_limits[api_key]
if not rate_info.is_allowed(key_info.rate_limit, RATE_LIMIT_WINDOW):
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
logger.warning(f"Rate limit exceeded for key '{key_info.name}'")
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {RATE_LIMIT_WINDOW} seconds.",
headers={
"X-RateLimit-Limit": str(key_info.rate_limit),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(time.time()) + RATE_LIMIT_WINDOW),
"Retry-After": str(RATE_LIMIT_WINDOW),
},
)
remaining = rate_info.remaining(key_info.rate_limit, RATE_LIMIT_WINDOW)
else:
remaining = None
logger.debug(f"Authenticated request from '{key_info.name}' to {path}")
return AuthResult(
authenticated=True,
key_name=key_info.name,
is_internal=key_info.is_internal,
rate_limit_remaining=remaining,
)
def get_api_key_stats() -> dict:
"""Get statistics about API keys (for admin endpoint)."""
stats = {
"total_keys": len(_api_keys),
"auth_required": REQUIRE_AUTH,
"rate_limit": {
"requests_per_window": RATE_LIMIT_REQUESTS,
"window_seconds": RATE_LIMIT_WINDOW,
},
"keys": [],
}
for key, info in _api_keys.items():
# Don't expose actual keys, just metadata
masked_key = key[:8] + "..." if len(key) > 8 else "***"
rate_info = _rate_limits.get(key, RateLimitInfo())
stats["keys"].append({
"name": info.name,
"key_prefix": masked_key,
"is_internal": info.is_internal,
"requests_in_window": len(rate_info.requests),
"remaining": rate_info.remaining(info.rate_limit, RATE_LIMIT_WINDOW),
})
return stats
def reload_api_keys():
"""Reload API keys from environment (for runtime updates)."""
global _api_keys
_api_keys = _parse_api_keys()
logger.info(f"Reloaded {len(_api_keys)} API keys")
"""Verify API key with TTS scope."""
return await _verify_api_key(request, scope="tts", api_key=api_key)

View file

@ -1,145 +1,22 @@
"""
External API Key Validation via mana-core-auth
When EXTERNAL_AUTH_ENABLED=true, API keys are validated against the
central mana-core-auth service. This allows users to create and manage
API keys from the mana.how web interface.
Results are cached for 5 minutes to reduce load on the auth service.
External API Key Validation delegates to shared manacore_auth package.
"""
import sys
import os
import time
import logging
import httpx
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "packages", "shared-python"))
# Configuration
EXTERNAL_AUTH_ENABLED = os.getenv("EXTERNAL_AUTH_ENABLED", "false").lower() == "true"
MANA_CORE_AUTH_URL = os.getenv("MANA_CORE_AUTH_URL", "http://localhost:3001")
API_KEY_CACHE_TTL = int(os.getenv("API_KEY_CACHE_TTL", "300")) # 5 minutes
EXTERNAL_AUTH_TIMEOUT = float(os.getenv("EXTERNAL_AUTH_TIMEOUT", "5.0")) # seconds
from manacore_auth.external_auth import (
ExternalValidationResult,
is_external_auth_enabled,
validate_api_key_external,
clear_cache,
)
@dataclass
class ExternalValidationResult:
"""Result from external API key validation."""
valid: bool
user_id: Optional[str] = None
scopes: Optional[list] = None
rate_limit_requests: int = 60
rate_limit_window: int = 60
error: Optional[str] = None
cached_at: float = 0.0
# In-memory cache for validation results
# Key: API key, Value: ExternalValidationResult
_validation_cache: dict[str, ExternalValidationResult] = {}
def is_external_auth_enabled() -> bool:
"""Check if external authentication is enabled."""
return EXTERNAL_AUTH_ENABLED
def _get_cached_result(api_key: str) -> Optional[ExternalValidationResult]:
"""Get cached validation result if still valid."""
result = _validation_cache.get(api_key)
if result and (time.time() - result.cached_at) < API_KEY_CACHE_TTL:
return result
return None
def _cache_result(api_key: str, result: ExternalValidationResult):
"""Cache a validation result."""
result.cached_at = time.time()
_validation_cache[api_key] = result
# Clean up old entries periodically (keep cache size manageable)
if len(_validation_cache) > 1000:
now = time.time()
expired_keys = [
k for k, v in _validation_cache.items()
if (now - v.cached_at) >= API_KEY_CACHE_TTL
]
for k in expired_keys:
del _validation_cache[k]
async def validate_api_key_external(api_key: str, scope: str) -> Optional[ExternalValidationResult]:
"""
Validate an API key against mana-core-auth service.
Args:
api_key: The API key to validate (e.g., "sk_live_...")
scope: The required scope (e.g., "stt" or "tts")
Returns:
ExternalValidationResult if external auth is enabled and the key was validated.
None if external auth is disabled or the service is unavailable (fallback to local).
"""
if not EXTERNAL_AUTH_ENABLED:
return None
# Check cache first
cached = _get_cached_result(api_key)
if cached:
logger.debug(f"Using cached validation result for key prefix: {api_key[:12]}...")
# Check scope against cached result
if cached.valid and cached.scopes and scope not in cached.scopes:
return ExternalValidationResult(
valid=False,
error=f"API key does not have scope: {scope}",
)
return cached
# Call mana-core-auth validation endpoint
try:
async with httpx.AsyncClient(timeout=EXTERNAL_AUTH_TIMEOUT) as client:
response = await client.post(
f"{MANA_CORE_AUTH_URL}/api/v1/api-keys/validate",
json={"apiKey": api_key, "scope": scope},
)
if response.status_code == 200:
data = response.json()
result = ExternalValidationResult(
valid=data.get("valid", False),
user_id=data.get("userId"),
scopes=data.get("scopes", []),
rate_limit_requests=data.get("rateLimit", {}).get("requests", 60),
rate_limit_window=data.get("rateLimit", {}).get("window", 60),
error=data.get("error"),
)
_cache_result(api_key, result)
return result
else:
logger.warning(
f"External auth returned status {response.status_code}: {response.text}"
)
# Don't cache errors - allow retry
return ExternalValidationResult(
valid=False,
error=f"Auth service returned {response.status_code}",
)
except httpx.TimeoutException:
logger.warning("External auth service timeout - falling back to local auth")
return None
except httpx.ConnectError:
logger.warning("Cannot connect to external auth service - falling back to local auth")
return None
except Exception as e:
logger.error(f"External auth error: {e}")
return None
def clear_cache():
"""Clear the validation cache (for testing or runtime updates)."""
global _validation_cache
_validation_cache.clear()
logger.info("External auth cache cleared")
__all__ = [
"ExternalValidationResult",
"is_external_auth_enabled",
"validate_api_key_external",
"clear_cache",
]