Source code for govbr_auth.fake_govbr

import base64
import secrets
import hashlib
from datetime import datetime, timedelta
from pydantic import BaseModel
import jwt
import logging
logger = logging.getLogger(__name__)


_SESSION_DATA = {}


class AuthorizationRequest(BaseModel):
    response_type: str
    client_id: str
    scope: str
    redirect_uri: str
    nonce: str
    state: str
    code_challenge: str
    code_challenge_method: str


GOVBR_LOGIN_PAGE_FAKE = f"""
<!DOCTYPE html>
<html lang="pt-BR">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Login - Gov.br (FAKE)</title>
    <style>
        * {{
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }}
        body {{
            font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
            background: linear-gradient(135deg, #ff6b35 0%, #f7931e 50%, #ff8c42 100%);
            min-height: 100vh;
            display: flex;
            justify-content: center;
            align-items: center;
            padding: 20px;
        }}
        .login-container {{
            background: white;
            border-radius: 16px;
            box-shadow: 0 20px 60px rgba(255, 107, 53, 0.3);
            width: 100%;
            max-width: 420px;
            padding: 40px;
            animation: fadeIn 0.5s ease-in;
        }}
        @keyframes fadeIn {{
            from {{ opacity: 0; transform: translateY(20px); }}
            to {{ opacity: 1; transform: translateY(0); }}
        }}
        .fake-badge {{
            background: linear-gradient(135deg, #ff6b35 0%, #f7931e 100%);
            color: white;
            padding: 10px 20px;
            border-radius: 25px;
            font-size: 12px;
            font-weight: bold;
            text-align: center;
            margin-bottom: 20px;
            display: inline-block;
            box-shadow: 0 4px 15px rgba(255, 107, 53, 0.3);
        }}
        .logo {{
            text-align: center;
            margin-bottom: 30px;
        }}
        .logo h1 {{
            color: #ff6b35;
            font-size: 36px;
            margin-bottom: 5px;
            font-weight: 700;
            text-shadow: 2px 2px 4px rgba(255, 107, 53, 0.1);
        }}
        .logo p {{
            color: #666;
            font-size: 14px;
        }}
        .form-group {{
            margin-bottom: 20px;
        }}
        label {{
            display: block;
            color: #333;
            font-weight: 600;
            margin-bottom: 8px;
            font-size: 14px;
        }}
        input {{
            width: 100%;
            padding: 14px;
            border: 2px solid #e0e0e0;
            border-radius: 8px;
            font-size: 14px;
            transition: all 0.3s;
        }}
        input:focus {{
            outline: none;
            border-color: #ff6b35;
            box-shadow: 0 0 0 3px rgba(255, 107, 53, 0.1);
        }}
        .btn {{
            width: 100%;
            padding: 16px;
            background: linear-gradient(135deg, #ff6b35 0%, #f7931e 100%);
            color: white;
            border: none;
            border-radius: 8px;
            font-size: 16px;
            font-weight: 600;
            cursor: pointer;
            transition: all 0.3s;
            box-shadow: 0 4px 15px rgba(255, 107, 53, 0.3);
        }}
        .btn:hover {{
            transform: translateY(-2px);
            box-shadow: 0 6px 20px rgba(255, 107, 53, 0.4);
        }}
        .info {{
            margin-top: 25px;
            padding: 20px;
            background: linear-gradient(135deg, #fff5f0 0%, #ffe8dc 100%);
            border-left: 4px solid #ff6b35;
            border-radius: 8px;
        }}
        .info h3 {{
            color: #ff6b35;
            font-size: 14px;
            margin-bottom: 12px;
            font-weight: 700;
        }}
        .info p {{
            color: #555;
            font-size: 12px;
            line-height: 1.6;
            margin-bottom: 5px;
        }}
        .error {{
            background: #fee;
            border-left-color: #c00;
            color: #c00;
        }}
    </style>
</head>
<body>
    <div class="login-container">            
        <div class="logo">
            <h1>Simulador gov.br</h1>
            <p>Acesso Simplificado</p>
        </div>
        
        <form method="POST" action="/fake-govbr/login">
            <input type="hidden" name="request_id" value="">
            
            <div class="form-group">
                <label for="email">E-mail:</label>
                <input type="email" id="email" name="email" placeholder="seuemail@example.com" required>
            </div>
            
            <div class="form-group">
                <label for="password">CPF (senha):</label>
                <input type="text" id="password" name="password" placeholder="Digite o CPF (11 dígitos)" required>
            </div>
            
            <button type="submit" class="btn">Entrar</button>
        </form>
    </div>
</body>
</html>
"""

[docs] class FakeUserData(BaseModel): """ Schema para dados de um usuário fake no sistema. Args: cpf: CPF do usuário (11 dígitos, usado como senha no fake). nome: Nome completo do usuário. email: E-mail do usuário. picture: URL da foto do usuário (opcional). """ cpf: str nome: str email: str picture: str = "https://www.gov.br/++theme++padrao_govbr/img/govbr-logo-large.png"
[docs] class FakeGovBrService: """ Serviço que simula o comportamento do Gov.br para desenvolvimento. Mantém sessões e autorizações em memória, sem necessidade de banco de dados. Simula o fluxo OAuth 2.0 com PKCE do Gov.br real. Args: users: Dicionário de usuários válidos (chave=CPF, valor=FakeUserData). session_ttl: Tempo de vida da sessão em segundos (padrão: 600 = 10 minutos). jwt_secret: Chave secreta para assinar o id_token. client_id: ID do cliente OAuth (deve coincidir com a config). """
[docs] def __init__(self, users: dict[str, FakeUserData], session_ttl: int = 600, jwt_secret: str = "fake-govbr-secret-key-dev-only", client_id: str = "fake-client-id"): self.users = users self.session_ttl = session_ttl self.jwt_secret = jwt_secret self.client_id = client_id self._sessions = {} self._authorization_codes = {} logger.info(f"FakeGovBrService inicializado com {len(users)} usuários")
[docs] def create_session(self, state: str, code_challenge: str, redirect_uri: str, nonce: str, scope: str = "openid profile email") -> str: """ Cria uma sessão temporária para armazenar os parâmetros OAuth. Args: state: State criptografado do OAuth. code_challenge: Desafio PKCE. redirect_uri: URI de redirecionamento. nonce: Nonce para validação. scope: Escopos solicitados. Returns: request_id: ID único da sessão criada. """ request_id = secrets.token_urlsafe(32) session_data = { "state": state, "code_challenge": code_challenge, "redirect_uri": redirect_uri, "nonce": nonce, "scope": scope, "created_at": datetime.now(), "expires_at": datetime.now() + timedelta(seconds=self.session_ttl) } self._sessions[request_id] = session_data logger.debug(f"Sessão criada: {request_id}") return request_id
[docs] def get_session(self, request_id: str) -> dict: """ Recupera uma sessão pelo ID. Args: request_id: ID da sessão. Returns: Dados da sessão. Raises: ValueError: Se a sessão não existir ou estiver expirada. """ if request_id not in self._sessions: raise ValueError("Sessão não encontrada ou expirada") session = self._sessions[request_id] if datetime.now() > session["expires_at"]: del self._sessions[request_id] raise ValueError("Sessão expirada") return session
[docs] def authenticate_user(self, request_id: str, email: str, cpf: str) -> str: """ Autentica um usuário e gera o código de autorização. Args: request_id: ID da sessão OAuth. email: E-mail fornecido pelo usuário. cpf: CPF fornecido como senha. Returns: URL de redirecionamento com code e state. Raises: ValueError: Se credenciais inválidas ou sessão inválida. """ # Valida a sessão session = self.get_session(request_id) # Remove formatação do CPF cpf_clean = cpf.replace(".", "").replace("-", "").strip() # Valida se o usuário existe if cpf_clean not in self.users: raise ValueError("CPF não encontrado") user = self.users[cpf_clean] # Valida o e-mail if user.email != email: raise ValueError("E-mail não corresponde ao CPF informado") # Gera o código de autorização auth_code = secrets.token_urlsafe(32) # Armazena o código com os dados necessários self._authorization_codes[auth_code] = { "user": user, "code_challenge": session["code_challenge"], "redirect_uri": session["redirect_uri"], "nonce": session["nonce"], "scope": session["scope"], "created_at": datetime.now(), "expires_at": datetime.now() + timedelta(seconds=300) # Code expira em 5 min } # Remove a sessão (já foi usada) del self._sessions[request_id] # Monta a URL de redirecionamento redirect_url = f"{session['redirect_uri']}?code={auth_code}&state={session['state']}" logger.info(f"Usuário autenticado: {user.email}") return redirect_url
[docs] def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: str, client_id: str = None) -> dict: """ Troca o código de autorização por tokens (simulando o endpoint /token do Gov.br). Args: code: Código de autorização. code_verifier: Verificador PKCE. redirect_uri: URI de redirecionamento (deve ser a mesma). client_id: ID do cliente (opcional, para validação). Returns: Dicionário com access_token, id_token, etc. Raises: ValueError: Se código inválido ou code_verifier não corresponder. """ if code not in self._authorization_codes: raise ValueError("Código de autorização inválido ou expirado") auth_data = self._authorization_codes[code] # Valida expiração if datetime.now() > auth_data["expires_at"]: del self._authorization_codes[code] raise ValueError("Código de autorização expirado") # Valida redirect_uri if auth_data["redirect_uri"] != redirect_uri: raise ValueError("redirect_uri não corresponde") # Valida code_verifier usando PKCE (S256) code_challenge_calculated = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode('utf-8')).digest() ).decode('utf-8').replace('=', '') if auth_data["code_challenge"] != code_challenge_calculated: raise ValueError("code_verifier inválido") user = auth_data["user"] # Gera o access_token (fake) access_token = secrets.token_urlsafe(32) # Gera o id_token (JWT) now = datetime.now() id_token_payload = { "sub": user.cpf, "name": user.nome, "email": user.email, "email_verified": True, "picture": user.picture, "iss": "https://sso.staging.acesso.gov.br", "aud": client_id or self.client_id, "iat": int(now.timestamp()), "exp": int((now + timedelta(hours=1)).timestamp()), "nonce": auth_data["nonce"], "amr": ["passwd"], "cpf": user.cpf } id_token = jwt.encode( id_token_payload, self.jwt_secret, algorithm="HS256" ) # Remove o código (já foi usado) del self._authorization_codes[code] logger.info(f"Token gerado para usuário: {user.email}") # Retorna no formato Gov.br return { "access_token": access_token, "token_type": "Bearer", "expires_in": 3600, "id_token": id_token, "scope": auth_data["scope"] }
[docs] def cleanup_expired(self): """ Remove sessões e códigos expirados da memória. Deve ser chamado periodicamente para evitar vazamento de memória. """ now = datetime.now() # Limpa sessões expiradas expired_sessions = [ sid for sid, session in self._sessions.items() if now > session["expires_at"] ] for sid in expired_sessions: del self._sessions[sid] # Limpa códigos expirados expired_codes = [ code for code, auth in self._authorization_codes.items() if now > auth["expires_at"] ] for code in expired_codes: del self._authorization_codes[code] if expired_sessions or expired_codes: logger.debug(f"Limpeza: {len(expired_sessions)} sessões e {len(expired_codes)} códigos removidos")
# Funções auxiliares para integração com frameworks
[docs] def render_fake_login_page(service: FakeGovBrService, auth_request: AuthorizationRequest) -> tuple[str, str]: """ Renderiza a página de login fake e cria a sessão OAuth. Args: service: Instância do FakeGovBrService. auth_request: Dados da requisição de autorização. Returns: Tupla com (HTML da página, request_id da sessão). """ request_id = service.create_session( state=auth_request.state, code_challenge=auth_request.code_challenge, redirect_uri=auth_request.redirect_uri, nonce=auth_request.nonce, scope=auth_request.scope ) # Injeta o request_id no HTML html = GOVBR_LOGIN_PAGE_FAKE.replace('value=""', f'value="{request_id}"') return html, request_id
[docs] def process_fake_login(service: FakeGovBrService, request_id: str, email: str, cpf: str) -> str: """ Processa o login fake e retorna a URL de redirecionamento. Args: service: Instância do FakeGovBrService. request_id: ID da sessão OAuth. email: E-mail fornecido pelo usuário. cpf: CPF fornecido como senha. Returns: URL de redirecionamento com code e state. Raises: ValueError: Se credenciais inválidas. """ return service.authenticate_user(request_id, email, cpf)
[docs] def create_default_fake_users() -> dict[str, FakeUserData]: """ Cria um conjunto de usuários fake padrão para desenvolvimento. Returns: Dicionário de usuários (chave=CPF). """ return { "12345678901": FakeUserData( cpf="12345678901", nome="João da Silva", email="joao.silva@example.com" ), "98765432100": FakeUserData( cpf="98765432100", nome="Maria Oliveira", email="maria.oliveira@example.com" ), "11122233344": FakeUserData( cpf="11122233344", nome="José Santos", email="jose.santos@example.com" ) }