Source code for govbr_auth.controller

import asyncio
from typing import Callable, Optional
from urllib.parse import urlparse

from govbr_auth.core.config import GovBrConfig
from govbr_auth.core.govbr import GovBrException, GovBrAuthenticationError
from pydantic import BaseModel
from govbr_auth import GovBrAuthorize
from govbr_auth import GovBrIntegration

__all__ = ["GovBrConnector"]


class AuthenticationSchema(BaseModel):
    """
    Schema para o corpo da requisição de autenticação.
    """
    code: str
    state: str


class AuthenticateResponseBody(BaseModel):
    """
    Schema para a resposta de autenticação.
    """
    access_token: str
    token_type: str
    expires_in: int
    id_token: str


[docs] class GovBrConnector: """ Classe responsável por conectar o Gov.br com os frameworks FastAPI, Flask e Django. Ela fornece métodos para inicializar as rotas de autenticação e autorização do Gov.br em cada um desses frameworks. :type config: GovBrConfig :type prefix: str :type authorize_endpoint: str :type authenticate_endpoint: str """
[docs] def __init__(self, config: GovBrConfig, prefix="/auth/govbr", authorize_endpoint="authorize", authenticate_endpoint="authenticate", on_auth_success: Optional[Callable[[dict, object], object]] = None, fake_users: Optional[dict] = None, fake_jwt_secret: str = "fake-govbr-dev-secret" ): """ Inicializa a classe GovBrConnector com as configurações necessárias. :param config: Instância de GovBrConfig contendo as configurações necessárias para a autenticação. :param prefix: Prefixo para as rotas de autenticação (padrão: "/auth/govbr"). :param authorize_endpoint: Endpoint para autorização (padrão: "authorize"). :param authenticate_endpoint: Endpoint para autenticação (padrão: "authenticate"). :param on_auth_success: Função de callback a ser chamada após a autenticação bem-sucedida. :param fake_users: Dicionário de usuários fake (opcional, apenas para modo fake). :param fake_jwt_secret: Chave secreta para JWT fake (opcional, apenas para modo fake). """ self.config = config self.config.prefix = prefix.strip("/ ") self.config.authorize_endpoint = authorize_endpoint.strip("/ ") self.config.authenticate_endpoint = authenticate_endpoint.strip("/ ") self.on_auth_success = on_auth_success # Detecta se está em modo fake e inicializa o serviço self.is_fake_mode = self._is_fake_mode() self.fake_service = None if self.is_fake_mode: from govbr_auth.fake_govbr import FakeGovBrService, create_default_fake_users users = fake_users if fake_users is not None else create_default_fake_users() self.fake_service = FakeGovBrService( users=users, jwt_secret=fake_jwt_secret, client_id=config.client_id ) import logging logger = logging.getLogger(__name__) logger.info(f"🧪 Modo FAKE Gov.br ativado - Endpoints fake serão registrados automaticamente")
def _is_fake_mode(self): """ Verifica se o modo fake está habilitado. Retorna True quando o flag `use_fake` está ativo na configuração ou quando as URLs configuradas apontam para hosts locais, preservando compatibilidade com a heurística existente. """ if getattr(self.config, "use_fake", False): return True auth_host = urlparse(self.config.govbr_auth_url).hostname token_host = urlparse(self.config.govbr_token_url).hostname local_hosts = {"localhost", "127.0.0.1"} if auth_host in local_hosts and token_host in local_hosts: return True return False
[docs] def init_fastapi(self, app): from fastapi.routing import APIRouter from fastapi import Request router = APIRouter(prefix=f"/{self.config.prefix}", tags=["GovBR Auth"]) @router.get(f"/{self.config.authorize_endpoint}") async def get_authorize_url(): try: return GovBrAuthorize(self.config).build_authorize_url() except GovBrException as e: from fastapi import HTTPException raise HTTPException(status_code=400, detail=str(e)) @router.post(f"/{self.config.authenticate_endpoint}") async def govbr_callback(data: AuthenticationSchema, request: Request): try: integration = GovBrIntegration(self.config) result = await integration.async_exchange_code_for_token(data.code, data.state) if self.on_auth_success: result = self.on_auth_success(result, request) if asyncio.iscoroutine(result): return await result return result except (GovBrException, GovBrAuthenticationError) as e: from fastapi import HTTPException raise HTTPException(status_code=401, detail=str(e)) app.include_router(router) # Se estiver em modo fake, registra os endpoints fake if self.is_fake_mode and self.fake_service: from fastapi import Form, HTTPException from fastapi.responses import HTMLResponse, RedirectResponse from govbr_auth.fake_govbr import render_fake_login_page, process_fake_login, AuthorizationRequest # Extrai o path base da URL fake parsed_url = urlparse(self.config.govbr_auth_url) path_parts = parsed_url.path.rstrip('/').split('/') auth_path = '/'.join(path_parts[:-1]) if len(path_parts) > 1 else '' fake_router = APIRouter(prefix=auth_path, tags=["Fake Gov.br (Dev Only)"]) @fake_router.get("/authorize", response_class=HTMLResponse) async def fake_authorize( response_type: str, client_id: str, scope: str, redirect_uri: str, state: str, nonce: str, code_challenge: str, code_challenge_method: str ): """Simula o endpoint /authorize do Gov.br""" auth_request = AuthorizationRequest( response_type=response_type, client_id=client_id, scope=scope, redirect_uri=redirect_uri, state=state, nonce=nonce, code_challenge=code_challenge, code_challenge_method=code_challenge_method ) html, _ = render_fake_login_page(self.fake_service, auth_request) return html @fake_router.post("/login") async def fake_login( request_id: str = Form(...), email: str = Form(...), password: str = Form(...) ): """Processa o login fake""" try: redirect_url = process_fake_login( service=self.fake_service, request_id=request_id, email=email, cpf=password ) return RedirectResponse(url=redirect_url, status_code=302) except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) @fake_router.post("/token") async def fake_token( code: str = Form(...), code_verifier: str = Form(...), redirect_uri: str = Form(...), client_id: str = Form(None), grant_type: str = Form("authorization_code") ): """Simula o endpoint /token do Gov.br""" try: return self.fake_service.exchange_code_for_token( code=code, code_verifier=code_verifier, redirect_uri=redirect_uri, client_id=client_id ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @fake_router.get("/users") async def list_fake_users(): """Lista os usuários disponíveis para teste""" users_list = [] for cpf, user in self.fake_service.users.items(): users_list.append({ "cpf": cpf, "nome": user.nome, "email": user.email, "senha": cpf }) return {"usuarios_de_teste": users_list} app.include_router(fake_router)
[docs] def init_flask(self, app): from flask import Blueprint, request, jsonify bp = Blueprint('govbr_auth', __name__, url_prefix=f"/{self.config.prefix}") @bp.route(f'/{self.config.authorize_endpoint}', methods=['GET']) def get_authorize_url(): try: authorize = GovBrAuthorize(self.config) return jsonify(authorize.build_authorize_url()) except (GovBrException, GovBrAuthenticationError) as e: return jsonify({"error": str(e)}), 400 @bp.route(f'/{self.config.authenticate_endpoint}', methods=['POST']) def govbr_callback(): code = request.args.get("code") state = request.args.get("state") if not code or not state: return jsonify({"error": "Missing 'code' or 'state' parameter"}), 400 try: integration = GovBrIntegration(self.config) result = integration.exchange_code_for_token_sync(code, state) if self.on_auth_success: return self.on_auth_success(result, request) else: return jsonify(result) except (GovBrException, GovBrAuthenticationError) as e: return jsonify({"error": str(e)}), 401 app.register_blueprint(bp) # Se estiver em modo fake, registra os endpoints fake if self.is_fake_mode and self.fake_service: from flask import redirect from govbr_auth.fake_govbr import render_fake_login_page, process_fake_login, AuthorizationRequest # Extrai o path base da URL fake auth_path = self.config.govbr_auth_url.split('://')[-1] auth_path = '/' + '/'.join(auth_path.split('/')[1:-1]) fake_bp = Blueprint('fake_govbr', __name__, url_prefix=auth_path) @fake_bp.route("/authorize") def fake_authorize(): """Simula o endpoint /authorize do Gov.br""" auth_request = AuthorizationRequest( response_type=request.args.get("response_type"), client_id=request.args.get("client_id"), scope=request.args.get("scope"), redirect_uri=request.args.get("redirect_uri"), state=request.args.get("state"), nonce=request.args.get("nonce"), code_challenge=request.args.get("code_challenge"), code_challenge_method=request.args.get("code_challenge_method") ) html, _ = render_fake_login_page(self.fake_service, auth_request) return html @fake_bp.route("/login", methods=["POST"]) def fake_login(): """Processa o login fake""" try: redirect_url = process_fake_login( service=self.fake_service, request_id=request.form.get("request_id"), email=request.form.get("email"), cpf=request.form.get("password") ) return redirect(redirect_url) except ValueError as e: return jsonify({"error": str(e)}), 401 @fake_bp.route("/token", methods=["POST"]) def fake_token(): """Simula o endpoint /token do Gov.br""" try: token_data = self.fake_service.exchange_code_for_token( code=request.form.get("code"), code_verifier=request.form.get("code_verifier"), redirect_uri=request.form.get("redirect_uri"), client_id=request.form.get("client_id") ) return jsonify(token_data) except ValueError as e: return jsonify({"error": str(e)}), 400 @fake_bp.route("/users") def list_fake_users(): """Lista os usuários disponíveis para teste""" users_list = [] for cpf, user in self.fake_service.users.items(): users_list.append({ "cpf": cpf, "nome": user.nome, "email": user.email, "senha": cpf }) return jsonify({"usuarios_de_teste": users_list}) app.register_blueprint(fake_bp)
[docs] def init_django(self): from django.urls import path from django.http import JsonResponse from django.views import View from django.urls import include import asyncio class GovBrUrlView(View): config = None def dispatch(self, request, *args, **kwargs): self.config = kwargs.pop('config', None) return super().dispatch(request, *args, **kwargs) def get(self, request): try: return JsonResponse(GovBrAuthorize(self.config).build_authorize_url()) except (GovBrException, GovBrAuthenticationError) as e: return JsonResponse({"error": str(e)}, status=400) class GovBrCallbackView(View): config = None on_auth_success = None def dispatch(self, request, *args, **kwargs): self.config = kwargs.pop('config', None) self.on_auth_success = kwargs.pop('on_auth_success', None) return super().dispatch(request, *args, **kwargs) def post(self, request): code = request.POST.get('code') state = request.POST.get('state') if not code or not state: return JsonResponse({"error": "Missing 'code' or 'state' parameter"}, status=400) try: result = asyncio.run(GovBrIntegration(self.config).async_exchange_code_for_token(code, state)) if self.on_auth_success: return self.on_auth_success(result, request) else: return JsonResponse(result) except (GovBrException, GovBrAuthenticationError) as e: return JsonResponse({"error": str(e)}, status=401) urls_patterns = [ path(self.config.authorize_endpoint, GovBrUrlView.as_view(), {'config': self.config}, name='govbr-auth-url'), path(self.config.authenticate_endpoint, GovBrCallbackView.as_view(), {'config': self.config, "on_auth_success": self.on_auth_success}, name='govbr-auth-callback'), ] return [path(f"{self.config.prefix}/", include(urls_patterns))]