The Data Governance solution uses Prefect for orchestrating data profiling, metadata enrichment, and data quality (DQ) rule generation workflows. Prefect provides task scheduling, concurrency control, observability, and failure handling.
All Prefect services are deployed alongside the Data Governance API via Docker Compose (local dev) or Helm (Kubernetes).
Flow definitions are managed as code in scripts/prefect/deploy.py — the single source of truth for all deployments. This file registers flows, configures schedules, and sets job variables.
prefect.yaml is deprecated. All flow definitions are managed in deploy.py. Secrets are read from pod environment variables (injected via Helm secretKeyRef) and passed as plain strings to Prefect job_variables.
Analyzes connected data sources to produce statistical profiles:
Setting
Value
Flow name
profile-database
Concurrency
ConcurrentTaskRunner(max_workers=10)
Input
Data connection ID, schema/table targets (database-level targets are not supported, specify schema or table), sampling config
Output
Column and table-level profiling results
Profiling, Metadata Assessment, and DQ Assessment workflows require schema-level or table-level targets. Database-level targets (entire database) are not supported, specify individual schemas or tables to process.
DQ rule generation does not require a reference pack. Earlier versions passed an industry context / reference pack into the workflow; that input was removed because it caused crashes under Prefect 3.x. Generation runs purely from schema context, profiling results, and metadata.
When you approve (“Save”) a profiling or enrichment workflow run, the platform applies two changes atomically:
Tombstone affected metric snapshots. Affected metric_snapshots rows are written with numerator=0, denominator=0. This invalidates the prior score so the scorecard does not display stale numbers while the next assessment is pending.
Resolve all matching OPEN violations. Every OPEN violation tied to an invalidated metric is closed in the same transaction, not just the most recent. Previously the resolve step closed only the latest violation, leaving older OPEN duplicates on the dashboard.
Net effect for users: scorecards update in real time. After Save, expect to see the affected scorecard cells flip to N/A (“pending next assessment”) and any related violation entries disappear from the open-violations view.
Violations are filtered by asset_fqn using an exact-or-child boundary: a filter for db.schema.card matches db.schema.card itself and any column-level child like db.schema.card.amount, but does not match sibling tables with prefix collisions like db.schema.cardtransaction. Earlier versions used a plain startswith, which produced false matches on sibling tables sharing a name prefix.
When an ingestion run is triggered by an asset.created event (or kicked off manually with no schemas listed in config), the workflow ingests all schemas in the connection, scoped by config.database. Earlier versions only ingested the first schema returned by the connection, which led to silent data gaps when a connection exposed multiple schemas.
The ConcurrentTaskRunner controls how many table-level tasks run simultaneously:
@flow(task_runner=ConcurrentTaskRunner(max_workers=10))async def profile_database(tables: list[str]): # Up to 10 tables profiled concurrently tasks = [profile_table.submit(table) for table in tables] results = await asyncio.gather(*[t.result() for t in tasks]) return results