Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.emergence.ai/llms.txt

Use this file to discover all available pages before exploring further.

Authenticate Users

This page shows how a solution validates user identity and project context. The platform provides Keycloak-issued JWTs (multi-realm OIDC/PKCE); your service is responsible for validating the signature, extracting claims, and propagating context to downstream calls. For the full platform-side authentication architecture (realm-per-org, JWKS rotation, SSO providers), see Platform › Authentication and Security › Authentication.

Request flow

Steps

1

Add JWT validation as a FastAPI dependency

Use python-jose to verify the token against the realm’s JWKS. Cache the JWKS for ~10 minutes to avoid hammering Keycloak.
packages/api/src/api/auth.py
import os
import time
from typing import Annotated
import httpx
from fastapi import Depends, Header, HTTPException, status
from jose import jwt, JWTError

KEYCLOAK_ISSUER_URL = os.environ["KEYCLOAK_ISSUER_URL"]   # e.g. https://kc.example.com/realms/<org>
KEYCLOAK_AUDIENCE   = os.environ["KEYCLOAK_AUDIENCE"]     # your service's client_id

# JWKS TTL — Keycloak rotates signing keys (default ~12h). Lifetime cache
# (e.g. lru_cache) WILL produce 401s for valid tokens after rotation.
_JWKS_TTL_SECONDS = 600  # 10 minutes
_jwks_cache: dict | None = None
_jwks_cached_at: float = 0.0

def _jwks() -> dict:
    global _jwks_cache, _jwks_cached_at
    now = time.monotonic()
    if _jwks_cache is None or now - _jwks_cached_at > _JWKS_TTL_SECONDS:
        url = f"{KEYCLOAK_ISSUER_URL.rstrip('/')}/protocol/openid-connect/certs"
        _jwks_cache = httpx.get(url, timeout=5.0).raise_for_status().json()
        _jwks_cached_at = now
    return _jwks_cache

def current_user(authorization: Annotated[str | None, Header()] = None) -> dict:
    if not authorization or not authorization.lower().startswith("bearer "):
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Missing bearer token")
    token = authorization.split(" ", 1)[1]
    try:
        claims = jwt.decode(
            token,
            _jwks(),
            algorithms=["RS256"],
            audience=KEYCLOAK_AUDIENCE,
            issuer=KEYCLOAK_ISSUER_URL,
        )
    except JWTError as e:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, f"Invalid token: {e}")
    return claims

def project_id(x_project_id: Annotated[str | None, Header()] = None) -> str:
    if not x_project_id:
        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Missing X-Project-ID header")
    return x_project_id
2

Protect your endpoints

packages/api/src/api/main.py
from fastapi import Depends, FastAPI
from typing import Annotated
from .auth import current_user, project_id

app = FastAPI()

@app.get("/healthz")
async def healthz() -> dict[str, str]:
    return {"status": "ok"}

@app.get("/echo")
async def echo(
    msg: str,
    user: Annotated[dict, Depends(current_user)],
    proj: Annotated[str, Depends(project_id)],
) -> dict[str, str]:
    return {
        "echo": msg,
        "user_sub": user["sub"],
        "org_id": user.get("org_id", ""),
        "project_id": proj,
    }
Routes that need auth declare current_user and (when project context matters) project_id as dependencies. Health probes stay unauthenticated.
3

Read identity, project, and roles from claims

Standard claims you’ll use:
ClaimMeaningNotes
subUser UUIDStable across sessions
issIssuer URLRealm path encodes org_id (last path segment)
audAudienceMust equal your service’s client_id
expExpiryUNIX seconds
org_idOrganizationCustom claim — also extractable from iss
groupsGroup membershipsUsed by Governance for role mapping
Helper to extract org_id from the issuer when the custom claim isn’t set:
def org_id_from_issuer(iss: str) -> str:
    # iss like "https://kc.example.com/realms/acme-corp"
    return iss.rstrip("/").rsplit("/", 1)[-1]
4

Check permissions via Governance

Solutions never implement RBAC themselves — they delegate to OpenFGA via Governance. Forward the user’s JWT and the resource ID; Governance returns allow/deny.
import httpx, os

GOVERNANCE_URL = os.environ.get("GOVERNANCE_URL", "http://em-runtime-governance.em-runtime:8001")

async def require_permission(token: str, project_id: str, action: str, resource_uri: str) -> None:
    async with httpx.AsyncClient() as c:
        r = await c.post(
            f"{GOVERNANCE_URL}/governance/permissions/check",
            headers={"Authorization": f"Bearer {token}", "X-Project-ID": project_id},
            json={"action": action, "resource_uri": resource_uri},
            timeout=5.0,
        )
    r.raise_for_status()
    if not r.json()["allowed"]:
        raise HTTPException(status.HTTP_403_FORBIDDEN, "Not authorized")
Use the auto-generated Python SDK instead of raw HTTP in real services — it handles error mapping, retries, and OpenAPI-validated request shapes.
5

Write a test fixture for a forged dev JWT

Local tests should not call real Keycloak. Generate a signed token with a fixture private key and load the matching public key as a JWKS override.
tests/conftest.py
from datetime import datetime, timedelta, timezone
from jose import jwt
import pytest

PRIV_KEY = open("tests/fixtures/dev-private.pem").read()
PUB_KEY  = open("tests/fixtures/dev-public.pem").read()

@pytest.fixture
def dev_token():
    now = datetime.now(timezone.utc)
    return jwt.encode(
        {
            "sub": "user-123",
            "iss": "http://localhost:8080/realms/dev",
            "aud": "hello-solution-api",
            "org_id": "dev",
            "groups": ["developers"],
            "iat": int(now.timestamp()),
            "exp": int((now + timedelta(hours=1)).timestamp()),
        },
        PRIV_KEY,
        algorithm="RS256",
    )
Generate the keypair once with openssl:
openssl genrsa -out tests/fixtures/dev-private.pem 2048
openssl rsa -in tests/fixtures/dev-private.pem -pubout -out tests/fixtures/dev-public.pem
Configure your current_user dependency to load tests/fixtures/dev-public.pem as JWKS in test mode (e.g., when PYTEST_CURRENT_TEST is set).

Service-to-service auth

When your solution’s worker calls another service (or back into Governance/Assets), it can’t use a user’s JWT. Use a service account: a Keycloak client with client_credentials grant. See Platform › Service Accounts for the management API.
async def service_token() -> str:
    async with httpx.AsyncClient() as c:
        r = await c.post(
            f"{KEYCLOAK_ISSUER_URL}/protocol/openid-connect/token",
            data={
                "grant_type": "client_credentials",
                "client_id": os.environ["SVC_ACCOUNT_CLIENT_ID"],
                "client_secret": os.environ["SVC_ACCOUNT_SECRET"],
            },
            timeout=5.0,
        )
    return r.json()["access_token"]
The service-account client must be granted the same OpenFGA roles you’d grant a user. Cache tokens for expires_in - 60 seconds to avoid thundering herds at expiry.

Common errors

See the auth section of Troubleshooting.

Next steps

Manage secrets

Move KEYCLOAK_AUDIENCE and DB creds out of .env.

RBAC configuration

Configure OpenFGA roles for your solution’s resources.

API authentication

Reference for all platform-API auth headers.

SDKs › Python

Use the auto-generated SDK instead of raw HTTP.