Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.emergence.ai/llms.txt

Use this file to discover all available pages before exploring further.

Use Shared Storage

The platform exposes object storage through obstore, a thin abstraction that lets the same code run against AWS S3, GCS, Azure Blob, and local MinIO. Solutions use it for artifacts, intermediate data, files served to the UI, and anything else that doesn’t belong in the relational database. For the platform-side abstraction model, see Platform Overview. For data-source connections (databases + storage credentials registered by customers), see Platform › Data Connections.

Why obstore

Three things you get for free:
  1. One code path, all clouds. No if AWS: ... elif GCP: ... in your service.
  2. Credential rotation handled by the platform. Connections fetch fresh credentials per request via the secrets pipeline.
  3. Per-tenant path isolation enforced by convention. Every operation is scoped by org_id + project_id (passed via X-Project-ID).
Read left-to-right: your code calls obs.put / obs.get with a path scoped by org_id/project_id/solution; the obstore client picks the actual backend from env config; the same code runs against any of S3/GCS/Azure/MinIO.

Path conventions

Stick to this prefix layout for everything you write:
/<org_id>/<project_id>/<solution>/<artifact-type>/<id>
Example:
/acme-corp/proj-42/talk2data/conversations/abc123/messages.jsonl
Predictable paths make backup, garbage collection, and audit trivial.

Provision a connection

Customers register a Data Connection (type s3, gcs, or minio) via Assets. Your solution looks it up by name when it needs to read or write.
The customer registers the connection in the UI or via the Assets API, then grants your solution permission. Your code:
from em_runtime_assets_sdk import AuthenticatedClient
from em_runtime_assets_sdk.api.connections import get_connection

async def get_storage_creds(token: str, project_id: str, conn_name: str) -> dict:
    client = AuthenticatedClient(base_url="http://em-runtime-assets.em-runtime:8002", token=token)
    conn = await get_connection.asyncio(client=client, x_project_id=project_id, name=conn_name)
    # conn.config holds bucket/region/endpoint; credentials fetched server-side via Secrets API
    return {
        "endpoint": conn.config.endpoint,
        "bucket":   conn.config.bucket,
        "region":   conn.config.region,
        "access_key": conn.credentials.access_key,
        "secret_key": conn.credentials.secret_key,
    }

Code patterns

The obstore Python package gives you a single API across providers. Install with uv add obstore.

Initialize a client from env

packages/api/src/api/storage.py
import os
from obstore.store import S3Store, GCSStore, AzureStore, LocalStore

def make_store():
    backend = os.environ.get("OBSTORE_BACKEND", "s3")
    if backend == "s3":
        return S3Store(
            bucket=os.environ["OBSTORE_BUCKET"],
            endpoint=os.environ.get("OBSTORE_ENDPOINT"),  # MinIO sets this; AWS doesn't
            access_key_id=os.environ["OBSTORE_ACCESS_KEY"],
            secret_access_key=os.environ["OBSTORE_SECRET_KEY"],
            region=os.environ.get("OBSTORE_REGION", "us-east-1"),
            virtual_hosted_style_request=os.environ.get("OBSTORE_PATH_STYLE", "false") != "true",
        )
    if backend == "gcs":
        return GCSStore(bucket=os.environ["OBSTORE_BUCKET"])  # Workload Identity
    if backend == "azure":
        return AzureStore(container=os.environ["OBSTORE_BUCKET"])
    if backend == "local":
        return LocalStore(prefix=os.environ.get("OBSTORE_LOCAL_PREFIX", "/tmp/obstore"))
    raise ValueError(f"Unknown backend: {backend}")

Put / get / list / delete

import obstore as obs

store = make_store()

# PUT (small) — bytes in, path key out
await obs.put_async(store, "acme-corp/proj-42/talk2data/conversations/abc123.json", b'{"hi":"there"}')

# GET (small)
data = await obs.get_async(store, "acme-corp/proj-42/talk2data/conversations/abc123.json")
print(data.bytes())  # b'{"hi":"there"}'

# LIST (paginated; iterate the async iterator)
async for batch in obs.list_async(store, prefix="acme-corp/proj-42/"):
    for entry in batch:
        print(entry.path, entry.size)

# DELETE
await obs.delete_async(store, "acme-corp/proj-42/talk2data/conversations/abc123.json")

Stream a large object (avoid loading into memory)

async def stream_to_disk(store, key: str, dest: str) -> None:
    response = await obs.get_async(store, key)
    with open(dest, "wb") as f:
        async for chunk in response.stream(min_chunk_size=8 * 1024 * 1024):
            f.write(chunk)
For multi-GB objects, prefer streaming over bytes().

Per-tenant isolation

X-Project-ID is mandatory on every storage operation. Build the prefix from request context:
from fastapi import Depends, HTTPException
from typing import Annotated
from .auth import current_user, project_id

def storage_prefix(
    user: Annotated[dict, Depends(current_user)],
    proj: Annotated[str, Depends(project_id)],
    solution: str = "hello-solution",
) -> str:
    org = user.get("org_id") or org_id_from_issuer(user["iss"])
    if not org or not proj:
        raise HTTPException(400, "Missing org_id or project_id")
    return f"{org}/{proj}/{solution}"
Now every read/write is scoped — there is no path a request can construct that crosses tenants. See Platform › Multi-tenancy for the org/project model.

Local development with MinIO

The Local Development page shows the docker-compose snippet. Pre-create the bucket once:
docker run --rm --network host minio/mc \
  sh -c "mc alias set local http://localhost:9000 minioadmin minioadmin && \
         mc mb -p local/<solution>-artifacts"
Then set OBSTORE_BACKEND=s3, OBSTORE_ENDPOINT=http://localhost:9000, OBSTORE_PATH_STYLE=true. The same code that talks to AWS S3 in prod talks to MinIO in dev.

Common errors

Set OBSTORE_PATH_STYLE=true. MinIO doesn’t support virtual-hosted-style addressing without DNS configuration.
Check that the bucket policy / Workload Identity SA grants the required action. For GCS Workload Identity, verify the K8s ServiceAccount is annotated with iam.gke.io/gcp-service-account=<gsa> and the GSA has roles/storage.objectAdmin on the bucket.
AWS S3 SDKs require a region; set OBSTORE_REGION even if you’re targeting a non-AWS endpoint via OBSTORE_ENDPOINT.
Your storage_prefix helper isn’t being used everywhere, OR your route accepts a user-controlled key without prefixing. Audit by grepping for obs.get_async\|obs.put_async\|obs.delete_async in your service and confirming every call site composes the path through storage_prefix(...).

Verification

# Round-trip test (local with MinIO)
PUT_RESULT=$(curl -s -X PUT \
  -H "Authorization: Bearer $TOKEN" -H "X-Project-ID: $PROJECT_ID" \
  http://localhost:8000/storage/test.txt --data-binary "hello")
GET_RESULT=$(curl -s \
  -H "Authorization: Bearer $TOKEN" -H "X-Project-ID: $PROJECT_ID" \
  http://localhost:8000/storage/test.txt)
[ "$GET_RESULT" = "hello" ] && echo "round-trip OK" || echo "FAIL"

Next steps

Data connections

How customers register storage and DB connections.

Multi-tenancy

Org/project isolation — what enforces it where.

Manage secrets

Storage credentials follow the same secret-injection pattern.

Troubleshooting

More storage error scenarios.