feat(scheduling): reuse output storage across region re-executions#5707
feat(scheduling): reuse output storage across region re-executions#5707aglinxinyuan wants to merge 11 commits into
Conversation
Add a `reusesOutputStorageOnReExecution` flag to `PhysicalOp` (default false) plus a `withReusesOutputStorageOnReExecution` builder. When set, `RegionExecutionCoordinator` reuses an operator's existing iceberg output and state documents on a region re-run instead of recreating them, via a new pure `provisionOutputDocument` decision function unit-tested by `RegionOutputProvisioningSpec`. The flag is named for the behavior the scheduler checks, not the operator that sets it, so any future operator needing output accumulated across region re-executions can opt in. No operator sets it yet, so the path is dormant and behavior-preserving. Split out of the loop-operators PR (apache#5700) to keep that PR reviewable; the loop feature will set the flag on Loop End.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5707 +/- ##
============================================
- Coverage 52.93% 52.74% -0.19%
+ Complexity 2630 2628 -2
============================================
Files 1090 1090
Lines 42210 42203 -7
Branches 4534 4535 +1
============================================
- Hits 22345 22262 -83
- Misses 18556 18640 +84
+ Partials 1309 1301 -8
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces an opt-in mechanism for operators to preserve their output/state storage across region re-executions (e.g., loop iterations), by adding a flag on PhysicalOp and updating the region scheduler’s output-document provisioning logic to conditionally reuse existing documents. It also adds focused unit tests for the new create-vs-reuse decision function.
Changes:
- Add
reusesOutputStorageOnReExecution: Booleanand awithReusesOutputStorageOnReExecutionbuilder toPhysicalOp. - Extract output document provisioning decision into
RegionExecutionCoordinator.provisionOutputDocumentand use it when provisioning per-output-port result/state documents. - Add
RegionOutputProvisioningSpecunit tests covering the reuse×exists matrix and the non-reuse short-circuit.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala | Adds a new operator-level flag + builder to signal output-storage reuse across region re-executions. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala | Adds a pure provisioning decision function and uses it to create-or-reuse result/state documents per output port based on the owning operator’s flag. |
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala | Introduces unit tests validating the provisioning decision logic without needing an Iceberg backend. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 381 | 0.233 | 26,874/34,618/34,618 us | 🔴 +31.3% / 🔴 +13.0% |
| ⚪ | bs=100 sw=10 sl=64 | 833 | 0.508 | 118,908/128,746/128,746 us | ⚪ within ±5% / 🟢 -7.9% |
| ⚪ | bs=1000 sw=10 sl=64 | 919 | 0.561 | 1,091,803/1,130,434/1,130,434 us | ⚪ within ±5% / 🔴 +12.2% |
Baseline details
Latest main 3dab771 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 381 tuples/sec | 420 tuples/sec | 410.82 tuples/sec | -9.3% | -7.3% |
| bs=10 sw=10 sl=64 | MB/s | 0.233 MB/s | 0.256 MB/s | 0.251 MB/s | -9.0% | -7.1% |
| bs=10 sw=10 sl=64 | p50 | 26,874 us | 20,466 us | 23,785 us | +31.3% | +13.0% |
| bs=10 sw=10 sl=64 | p95 | 34,618 us | 36,517 us | 34,980 us | -5.2% | -1.0% |
| bs=10 sw=10 sl=64 | p99 | 34,618 us | 36,517 us | 34,980 us | -5.2% | -1.0% |
| bs=100 sw=10 sl=64 | throughput | 833 tuples/sec | 839 tuples/sec | 891.94 tuples/sec | -0.7% | -6.6% |
| bs=100 sw=10 sl=64 | MB/s | 0.508 MB/s | 0.512 MB/s | 0.544 MB/s | -0.8% | -6.7% |
| bs=100 sw=10 sl=64 | p50 | 118,908 us | 119,810 us | 112,277 us | -0.8% | +5.9% |
| bs=100 sw=10 sl=64 | p95 | 128,746 us | 131,987 us | 139,802 us | -2.5% | -7.9% |
| bs=100 sw=10 sl=64 | p99 | 128,746 us | 131,987 us | 139,802 us | -2.5% | -7.9% |
| bs=1000 sw=10 sl=64 | throughput | 919 tuples/sec | 946 tuples/sec | 1,041 tuples/sec | -2.9% | -11.7% |
| bs=1000 sw=10 sl=64 | MB/s | 0.561 MB/s | 0.578 MB/s | 0.635 MB/s | -2.9% | -11.7% |
| bs=1000 sw=10 sl=64 | p50 | 1,091,803 us | 1,050,997 us | 972,714 us | +3.9% | +12.2% |
| bs=1000 sw=10 sl=64 | p95 | 1,130,434 us | 1,088,541 us | 1,023,057 us | +3.8% | +10.5% |
| bs=1000 sw=10 sl=64 | p99 | 1,130,434 us | 1,088,541 us | 1,023,057 us | +3.8% | +10.5% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,524.92,200,128000,381,0.233,26873.86,34618.29,34618.29
1,100,10,64,20,2401.78,2000,1280000,833,0.508,118908.09,128745.83,128745.83
2,1000,10,64,20,21759.70,20000,12800000,919,0.561,1091802.74,1130433.99,1130433.99Add a PhysicalOp builder test alongside the existing withParallelizable case, exercising the previously-uncovered `this.copy(...)` line that Codecov flagged on apache#5707 (patch coverage 85.71%, 1 missing line). Asserts the default false, the flipped value, and immutability of the original instance.
Yicong-Huang
left a comment
There was a problem hiding this comment.
Left comments inline!
…utputPort Address @Yicong-Huang's review on apache#5707: - Move the create-or-reuse decision out of RegionExecutionCoordinator into DocumentFactory.createOrReuseDocument -- it is storage-layer logic, not scheduling. - Move the reuse flag off PhysicalOp onto the OutputPort proto, alongside the existing per-port `blocking`/`mode`; storage behavior is port-specific. The coordinator now reads it per output port and maintains no reuse state itself. - Relocate the unit test to DocumentFactorySpec. Per-port differentiation is still required (answering the "why not reuse for all?" question): the loop back-edge re-executes LoopStart and every loop-body operator on the same event as LoopEnd, but only LoopEnd accumulates -- the others must recreate a fresh document each iteration.
…ith apache#5707) apache#5707 redesigned the reuse mechanism per review: the flag moved from PhysicalOp.reusesOutputStorageOnReExecution onto OutputPort.reusesOutputStorage, and the create-or-reuse decision moved out of RegionExecutionCoordinator into DocumentFactory.createOrReuseDocument. Apply the same change here so loop-feb stays internally consistent and rebases cleanly once apache#5707 lands: - mechanism files (workflow.proto, DocumentFactory, RegionExecutionCoordinator, PhysicalOp, DocumentFactorySpec) brought in line with apache#5707; drop the old RegionOutputProvisioningSpec. - LoopOpDesc.getPhysicalOp now sets reusesOutputStorage on the operator's output port (true for Loop End) instead of the removed PhysicalOp builder. - LoopStart/EndOpDescSpec assert the port flag; comment references updated.
Moving the create-or-reuse decision out of RegionExecutionCoordinator (into DocumentFactory) removed the only use of `java.net.URI` here; scalafix RemoveUnused flagged the leftover import in CI. Drop it.
Same lint fix as apache#5707: moving the create-or-reuse decision into DocumentFactory removed the only use of `java.net.URI` in RegionExecutionCoordinator; scalafix RemoveUnused flagged the leftover import in the amber Lint CI step.
…torage Per @Yicong-Huang's review on apache#5707: add a sanity check that iterates every registered operator (OperatorMetadataGenerator.operatorTypeMap) and asserts no output port sets reusesOutputStorage. No operator needs it yet -- Loop End, the only one that will, is not on main -- so this catches an unexpected/accidental enablement. To be updated to allow Loop End's output port (and only it) when the loop operators land.
…guard Follow-up to the apache#5707 guard (@Yicong-Huang). Declare the reusesOutputStorage flag on LoopOpDesc's output port in operatorInfo (alongside where blocking/mode live) instead of mapping it in getPhysicalOp, so it is declarative and the cross-operator guard can see it. Add OutputPortReuseFlagSpec -- the LoopEnd-allowing form of the apache#5707 guard: only Loop End may enable the flag; every other operator's output ports must have it false.
Yicong-Huang
left a comment
There was a problem hiding this comment.
Left minor comments inline.
… the flag Address @Yicong-Huang's review on apache#5707: - DocumentFactory.createOrReuseDocument now returns the VirtualDocument (opened when reused, created otherwise) instead of a Boolean, so the call site need not branch on create-vs-reuse. - RegionExecutionCoordinator adds a runtime require() that no output port sets reusesOutputStorage -- a production guard, since the flag only activates with the loop operators (not on main). Remove/relax it when introducing them.
apache#5707 apache#5707 changed DocumentFactory.createOrReuseDocument to return the VirtualDocument (opened when reused, created otherwise) instead of a Boolean. Sync that here. loop-feb deliberately omits apache#5707's production require-guard on reusesOutputStorage, since the loop operators legitimately set the flag.
What changes were proposed in this PR?
Adds a mechanism for an operator to reuse its output storage when its region re-executes, instead of having the documents recreated each time:
PhysicalOpgains areusesOutputStorageOnReExecution: Boolean = falsefield + awithReusesOutputStorageOnReExecutionbuilder.RegionExecutionCoordinatorgains a pureprovisionOutputDocument(uri, reuseExistingStorage, documentExists, createDocument)decision function, used per output port to decide create-vs-reuse based on the owning operator's flag.RegionOutputProvisioningSpecunit-tests the decision function (the reuse×exists matrix plus the "no-reuse never probes existence" short-circuit).false(every operator today)true(set by Loop End in the loop PR)Named for the behavior the scheduler checks, not the operator that sets it, so any future operator needing output accumulated across region re-executions can opt in. Dormant and behavior-preserving — no operator sets the flag in this PR.
Any related issues, documentation, discussions?
Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's review.
How was this PR tested?
sbt "WorkflowExecutionService/testOnly *RegionOutputProvisioningSpec"— 5 passing;WorkflowExecutionService/Test/compileclean; scalafmt clean on the changed projects (WorkflowCore,WorkflowExecutionService).Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.