Body
Context
In #66782 we wire up server-side consumption of task-emitted partition keys. The resulting AssetEvent.partition_key depends on whether the task touched
outlet_events[asset]:
| Task does... |
AssetEvent.partition_key |
| nothing |
inherits DagRun.partition_key |
sets extra only, no add_partitions() |
None |
add_partitions("us", "eu") |
"us", "eu" |
Rows 1 and 2 look the same from the Dag author's side ("I didn't say anything about partitions") but produce different events. Before we lock this in for the 3.2 release, we should pin down what each field means.
Proposed framing
DagRun.partition_key = provenance. The partition this run was triggered on. Not a default for events the run produces.
AssetEvent.partition_key = routing pointer. The key downstream partition mappers consume to decide which downstream DagRun to queue.
What that implies
- Silent task → event inherits
DagRun.partition_key. "I didn't say anything" is Identity passthrough of the run's own routing key.
- Row 2 is a bug under this framing — the producer was on a partition slice but left no routing pointer, so downstream partitioned consumers can never trigger. Normalise to row 1.
add_partitions("us", "eu") is explicit fan-out — one event per routing pointer.
outlet_events[alias].add_partitions(...) → no-op + warning. Aliases expand at queue time and have no routing of their own.
- Manual
partition_key on a non-partitioned Dag → reject.
- Runtime (
PartitionAtRuntime) silent task → event is None. No provenance to inherit.
- Runtime back-fill of
DagRun.partition_key from emitted keys is wrong under provenance framing — the run wasn't triggered on those keys, it discovered them. Drop it.
- n-1: downstream
DagRun.partition_key (provenance, target space) and consumed AssetEvent.partition_key (routing, source space) are different by design. The mapping stays in PartitionedAssetKeyLog.
- 1-n: same routing pointer, different target per downstream via each downstream's mapper. Upstream read source off the event, downstream read target off the triggered DagRun.
Today vs. proposal
| Rule |
Today |
Change if accepted |
| Silent task inherits run pk |
_register fallback in taskinstance.py already does this |
no change |
| Row 2 normalised |
per-emission partition_key=None is preserved, event ends up None |
in _register, when payload.partition_key is None, substitute dag_run_partition_key |
add_partitions(...) fan-out |
works |
no change |
Alias add_partitions |
silently dropped in _serialize_outlet_events; only the manager comment hints at it |
warn (and ignore) in OutletEventAccessor.add_partitions when key is AssetAliasUniqueKey |
| Manual pk on non-partitioned Dag |
accepted, stored, ignored |
reject at REST CLI trigger layer based on timetable.partitioned / partitioned_at_runtime |
Runtime silent task → None |
depends on whether back-fill has fired first this turn |
falls out for free once back-fill is dropped |
Runtime DagRun.partition_key back-fill |
taskinstance.py:1519-1525 back-fills when exactly 1 distinct emitted pk |
drop the block |
| n-1 / 1-n key spaces |
implemented via PartitionedAssetKeyLog |
doc-only — make the source/target distinction explicit in assets.rst |
All of this is unreleased (3.2.0 ships AIP-76), so changes can land in
place — no compat shim needed.
Related: #59295, #59050, #67239, #66782.
Committer
Body
Context
In #66782 we wire up server-side consumption of task-emitted partition keys. The resulting
AssetEvent.partition_keydepends on whether the task touchedoutlet_events[asset]:AssetEvent.partition_keyDagRun.partition_keyextraonly, noadd_partitions()Noneadd_partitions("us", "eu")"us","eu"Rows 1 and 2 look the same from the Dag author's side ("I didn't say anything about partitions") but produce different events. Before we lock this in for the 3.2 release, we should pin down what each field means.
Proposed framing
DagRun.partition_key= provenance. The partition this run was triggered on. Not a default for events the run produces.AssetEvent.partition_key= routing pointer. The key downstream partition mappers consume to decide which downstream DagRun to queue.What that implies
DagRun.partition_key. "I didn't say anything" is Identity passthrough of the run's own routing key.add_partitions("us", "eu")is explicit fan-out — one event per routing pointer.outlet_events[alias].add_partitions(...)→ no-op + warning. Aliases expand at queue time and have no routing of their own.partition_keyon a non-partitioned Dag → reject.PartitionAtRuntime) silent task → event isNone. No provenance to inherit.DagRun.partition_keyfrom emitted keys is wrong under provenance framing — the run wasn't triggered on those keys, it discovered them. Drop it.DagRun.partition_key(provenance, target space) and consumedAssetEvent.partition_key(routing, source space) are different by design. The mapping stays inPartitionedAssetKeyLog.Today vs. proposal
_registerfallback intaskinstance.pyalready does thispartition_key=Noneis preserved, event ends upNone_register, whenpayload.partition_key is None, substitutedag_run_partition_keyadd_partitions(...)fan-outadd_partitions_serialize_outlet_events; only the manager comment hints at itOutletEventAccessor.add_partitionswhen key isAssetAliasUniqueKeytimetable.partitioned/partitioned_at_runtimeNoneDagRun.partition_keyback-filltaskinstance.py:1519-1525back-fills when exactly 1 distinct emitted pkPartitionedAssetKeyLogassets.rstAll of this is unreleased (3.2.0 ships AIP-76), so changes can land in
place — no compat shim needed.
Related: #59295, #59050, #67239, #66782.
Committer