diff --git a/admin/auth.py b/admin/auth.py index 5d08d450e..e6a934446 100644 --- a/admin/auth.py +++ b/admin/auth.py @@ -20,7 +20,12 @@ existing Authentik-based authentication system used by the NMSampleLocations API. """ import os +import secrets from typing import Optional +from urllib.parse import urlencode + +import hashlib +import base64 from dataclasses import dataclass from typing import List @@ -227,30 +232,62 @@ async def login(self, *args, **kwargs) -> RedirectResponse: if request is None: raise LoginFailed("Unable to determine login request context.") - # Redirect to Authentik OAuth authorization endpoint authentik_authorize_url = os.environ.get("AUTHENTIK_AUTHORIZE_URL") - if not authentik_authorize_url: + authentik_client_id = os.environ.get("AUTHENTIK_CLIENT_ID") + if not authentik_authorize_url or not authentik_client_id: raise LoginFailed( - "Authentik authentication is not configured. Please set AUTHENTIK_AUTHORIZE_URL environment variable." + "Authentik authentication is not configured. Please set AUTHENTIK_AUTHORIZE_URL and AUTHENTIK_CLIENT_ID environment variables." ) # Store original URL to redirect back after login original_url = str(request.url_for("admin:index")) request.session["auth_redirect"] = original_url - - # Redirect to Authentik login - return RedirectResponse(url=authentik_authorize_url, status_code=302) - - async def logout(self, request: Request) -> RedirectResponse: + redirect_uri = str(request.url_for("admin_auth_callback")) + + # PKCE for public clients + code_verifier = secrets.token_urlsafe(64) + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") + + state = secrets.token_urlsafe(32) + request.session["auth_state"] = state + request.session["auth_code_verifier"] = code_verifier + + params = { + "response_type": "code", + "client_id": authentik_client_id, + "redirect_uri": redirect_uri, + "scope": "openid profile email", + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + } + + authorize_url = f"{authentik_authorize_url}?{urlencode(params)}" + return RedirectResponse(url=authorize_url, status_code=302) + + async def logout(self, *args, **kwargs) -> RedirectResponse: """ Handle logout by clearing session and redirecting. Args: - request: Starlette request object + request: Starlette request object (extracted from args/kwargs) + *args/**kwargs: Ignored, kept for compatibility with different + Starlette Admin logout call signatures Returns: RedirectResponse to home page """ + request: Optional[Request] = kwargs.get("request") + if request is None: + for arg in args: + if isinstance(arg, Request): + request = arg + break + + if request is None: + raise LoginFailed("Unable to determine logout request context.") + # Clear session tokens request.session.pop("token", None) request.session.pop("auth_redirect", None) diff --git a/core/initializers.py b/core/initializers.py index cc43cda8e..02ecc968f 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -107,6 +107,7 @@ def init_lexicon(path: str = None) -> None: def register_routes(app): + from admin.auth_routes import router as admin_auth_router from api.group import router as group_router from api.contact import router as contact_router from api.location import router as location_router @@ -126,6 +127,7 @@ def register_routes(app): from api.ngwmn import router as ngwmn_router app.include_router(asset_router) + app.include_router(admin_auth_router) app.include_router(author_router) app.include_router(contact_router) app.include_router(geospatial_router)