Files
ai/sandbox/dexorder/mcp_auth_middleware.py

44 lines
1.9 KiB
Python

# openclaw/auth.py
class MCPAuthMiddleware:
"""Authenticates incoming MCP connections based on configured mode."""
def __init__(self, config: AuthConfig):
self.config = config
self._jwks_client = None # lazy-loaded for platform mode
async def authenticate(self, request) -> AuthContext:
match self.config.mode:
case "local":
# stdio transport or localhost-only binding
# No auth needed — if you can exec into the container,
# you're the user
return AuthContext(user_id=self.config.local_user_id,
source="local")
case "token":
# User-generated API key (standalone remote access)
token = extract_bearer_token(request)
if not verify_token_hash(token, self.config.tokens):
raise AuthError("Invalid API token")
return AuthContext(user_id=self.config.local_user_id,
source="api_key")
case "platform":
# JWT signed by the OpenClaw platform
token = extract_bearer_token(request)
claims = await self._verify_platform_jwt(token)
if claims["sub"] != self.config.expected_user_id:
raise AuthError("User ID mismatch")
return AuthContext(user_id=claims["sub"],
source="platform",
scopes=claims.get("scopes", []))
async def _verify_platform_jwt(self, token: str) -> dict:
if not self._jwks_client:
self._jwks_client = JWKSClient(self.config.platform_jwks_url)
signing_key = await self._jwks_client.get_signing_key_from_jwt(token)
return jwt.decode(token, signing_key.key,
algorithms=["RS256"],
audience="openclaw-mcp")