From bcaed05852301c801d08e17696359c48276a3224 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 13 Jun 2026 18:57:07 -0700 Subject: [PATCH 1/6] feat(scheduling): reuse output storage across region re-executions Add a `reusesOutputStorageOnReExecution` flag to `PhysicalOp` (default false) plus a `withReusesOutputStorageOnReExecution` builder. When set, `RegionExecutionCoordinator` reuses an operator's existing iceberg output and state documents on a region re-run instead of recreating them, via a new pure `provisionOutputDocument` decision function unit-tested by `RegionOutputProvisioningSpec`. The flag is named for the behavior the scheduler checks, not the operator that sets it, so any future operator needing output accumulated across region re-executions can opt in. No operator sets it yet, so the path is dormant and behavior-preserving. Split out of the loop-operators PR (apache/texera#5700) to keep that PR reviewable; the loop feature will set the flag on Loop End. --- .../RegionExecutionCoordinator.scala | 58 +++++++++- .../RegionOutputProvisioningSpec.scala | 105 ++++++++++++++++++ .../amber/core/workflow/PhysicalOp.scala | 17 +++ 3 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5a9df11b589..1f3e563bca9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -60,10 +60,43 @@ import org.apache.texera.web.SessionState import org.apache.texera.web.model.websocket.event.RegionStateEvent import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource +import java.net.URI import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.{Duration => ScalaDuration} +object RegionExecutionCoordinator { + + /** + * Decide whether to (re)create the output document at `uri`, then act. + * + * When `reuseExistingStorage` is set and the document already exists, the + * existing document is kept untouched -- this is how an operator whose + * region re-executes (e.g. LoopEnd, which accumulates output across loop + * iterations) avoids clobbering output an earlier run produced, since + * `createDocument` overrides any existing document. Otherwise the document + * is created. + * + * `documentExists` / `createDocument` are injected so the create-or-reuse + * decision can be unit-tested without an iceberg backend or a live region. + * + * @return true iff `createDocument` was invoked. + */ + def provisionOutputDocument( + uri: URI, + reuseExistingStorage: Boolean, + documentExists: URI => Boolean, + createDocument: URI => Unit + ): Boolean = { + if (reuseExistingStorage && documentExists(uri)) { + false + } else { + createDocument(uri) + true + } + } +} + /** * The executor of a region. * @@ -576,8 +609,29 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - DocumentFactory.createDocument(resultURI, schema) - DocumentFactory.createDocument(stateURI, State.schema) + // Operators that reuse their output storage across region re-runs + // (e.g. LoopEnd, whose output accumulates across the iterations of its + // own loop) already have their result/state documents from a prior + // run; on re-execution `createDocument` (overrideIfExists=true) would + // clobber them, so reuse the existing document when it is already + // there. (The inner LoopEnd of a nested loop additionally drops its + // output once per outer iteration -- on the Python worker side in + // MainLoop._process_state_frame -- which is orthogonal to this + // region-provisioning reuse.) + // Decided per the operator that OWNS this port, not region-wide: a + // region mixing a reuse op (LoopEnd) with others must still recreate + // the others' documents on re-execution. + val reusesOutputStorage = + region.getOperator(outputPortId.opId).reusesOutputStorageOnReExecution + Seq((resultURI, schema), (stateURI, State.schema)).foreach { + case (uri, sch) => + RegionExecutionCoordinator.provisionOutputDocument( + uri, + reusesOutputStorage, + DocumentFactory.documentExists, + u => DocumentFactory.createDocument(u, sch) + ) + } if (!isRestart) { val (_, eid, _, _) = decodeURI(resultURI) WorkflowExecutionsResource.insertOperatorPortResultUri( diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala new file mode 100644 index 00000000000..8fda6ffcd4c --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.scheduling + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.net.URI +import scala.collection.mutable + +/** + * Unit tests for `RegionExecutionCoordinator.provisionOutputDocument`, the + * create-or-reuse decision behind output-port storage provisioning. + * + * This is the branch that lets a re-executing region (a loop body) keep the + * output an earlier run accumulated instead of clobbering it: a LoopEnd's + * region runs once per iteration, and `DocumentFactory.createDocument` + * overrides any existing document, so on a re-run we must reuse the existing + * document rather than recreate it. + * + * The decision was pulled out of the private `createOutputPortStorageObjects` + * (which needs a live controller + iceberg backend) into a pure function with + * injected `documentExists` / `createDocument`, so the four cases can be + * pinned directly with a spy -- no iceberg, no actor system. + */ +class RegionOutputProvisioningSpec extends AnyFlatSpec with Matchers { + + private val uri = new URI("vfs:///wf/result/loop-end") + + /** Run provisionOutputDocument and return (created?, number of create calls). */ + private def provision( + reuseExistingStorage: Boolean, + exists: Boolean + ): (Boolean, Int) = { + val createCalls = mutable.ArrayBuffer.empty[URI] + val created = RegionExecutionCoordinator.provisionOutputDocument( + uri, + reuseExistingStorage, + _ => exists, + u => { createCalls += u; () } + ) + (created, createCalls.size) + } + + "provisionOutputDocument" should + "reuse (not recreate) an existing document when the operator reuses storage" in { + // The loop-iteration case: the document is already there from a prior + // region run, so createDocument must NOT be called -- otherwise the + // accumulated output would be clobbered. + val (created, createCalls) = provision(reuseExistingStorage = true, exists = true) + created shouldBe false + createCalls shouldBe 0 + } + + it should "create the document when the operator reuses storage but none exists yet" in { + // First iteration: nothing to reuse, so it must be created. + val (created, createCalls) = provision(reuseExistingStorage = true, exists = false) + created shouldBe true + createCalls shouldBe 1 + } + + it should "always (re)create when the operator does not reuse storage, even if a document exists" in { + // Non-loop operators get a fresh document every region execution; an + // existing one is intentionally overwritten. + val (created, createCalls) = provision(reuseExistingStorage = false, exists = true) + created shouldBe true + createCalls shouldBe 1 + } + + it should "create when the operator does not reuse storage and none exists" in { + val (created, createCalls) = provision(reuseExistingStorage = false, exists = false) + created shouldBe true + createCalls shouldBe 1 + } + + it should "not call documentExists when the operator does not reuse storage (create unconditionally)" in { + // Short-circuit: a non-reuse operator always recreates, so it must not + // even probe for existence. + var existsProbed = false + RegionExecutionCoordinator.provisionOutputDocument( + uri, + reuseExistingStorage = false, + _ => { existsProbed = true; true }, + _ => () + ) + existsProbed shouldBe false + } +} diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala index 44125045c97..ddaae82e48e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala @@ -198,6 +198,12 @@ case class PhysicalOp( // schema propagation function propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), isOneToManyOp: Boolean = false, + // Whether to reuse this operator's existing output storage instead of + // recreating it when its region re-executes, so output accumulated by + // earlier runs (e.g. across loop iterations) survives. Named after the + // behavior the scheduler checks, not the operator that sets it, so any + // future operator needing the same treatment can reuse it. + reusesOutputStorageOnReExecution: Boolean = false, // hint for number of workers suggestedWorkerNum: Option[Int] = None, // name of the PVE to execute within @@ -316,6 +322,17 @@ case class PhysicalOp( def withIsOneToManyOp(isOneToManyOp: Boolean): PhysicalOp = this.copy(isOneToManyOp = isOneToManyOp) + /** + * Creates a copy specifying whether this operator's output storage is + * reused rather than recreated when its region re-executes (see the field + * doc). The region scheduler uses it to preserve iceberg output across + * loop iterations instead of overwriting it on every region invocation. + */ + def withReusesOutputStorageOnReExecution( + reusesOutputStorageOnReExecution: Boolean + ): PhysicalOp = + this.copy(reusesOutputStorageOnReExecution = reusesOutputStorageOnReExecution) + /** * Creates a copy of the PhysicalOp with the schema of a specified input port updated. * The schema can either be a successful schema definition or an error represented as a Throwable. From 8f734ec383df635dbf154df90146b50e07917a0d Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 13 Jun 2026 20:21:10 -0700 Subject: [PATCH 2/6] test(workflow-core): cover withReusesOutputStorageOnReExecution builder Add a PhysicalOp builder test alongside the existing withParallelizable case, exercising the previously-uncovered `this.copy(...)` line that Codecov flagged on apache/texera#5707 (patch coverage 85.71%, 1 missing line). Asserts the default false, the flipped value, and immutability of the original instance. --- .../amber/core/workflow/WorkflowCoreTypesSpec.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala index 11f73013bf8..20fe4a9b872 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala @@ -142,6 +142,14 @@ class WorkflowCoreTypesSpec extends AnyFlatSpec { assert(op.parallelizable, "the original instance is immutable") } + "PhysicalOp.withReusesOutputStorageOnReExecution" should "default to false and round-trip through copy" in { + val op = newPhysicalOp("a") + assert(!op.reusesOutputStorageOnReExecution, "defaults to false") + val flipped = op.withReusesOutputStorageOnReExecution(true) + assert(flipped.reusesOutputStorageOnReExecution) + assert(!op.reusesOutputStorageOnReExecution, "the original instance is immutable") + } + "PhysicalOp.withSuggestedWorkerNum" should "set the suggested worker count" in { val op = newPhysicalOp("a").withSuggestedWorkerNum(7) assert(op.suggestedWorkerNum.contains(7)) From 45fc47577bc4908b23c3d674d91458fde7859f9d Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 14 Jun 2026 16:54:27 -0700 Subject: [PATCH 3/6] refactor(scheduling): move reuse decision to storage layer, flag to OutputPort Address @Yicong-Huang's review on apache/texera#5707: - Move the create-or-reuse decision out of RegionExecutionCoordinator into DocumentFactory.createOrReuseDocument -- it is storage-layer logic, not scheduling. - Move the reuse flag off PhysicalOp onto the OutputPort proto, alongside the existing per-port `blocking`/`mode`; storage behavior is port-specific. The coordinator now reads it per output port and maintains no reuse state itself. - Relocate the unit test to DocumentFactorySpec. Per-port differentiation is still required (answering the "why not reuse for all?" question): the loop back-edge re-executes LoopStart and every loop-body operator on the same event as LoopEnd, but only LoopEnd accumulates -- the others must recreate a fresh document each iteration. --- .../RegionExecutionCoordinator.scala | 65 +++-------- .../RegionOutputProvisioningSpec.scala | 105 ------------------ .../apache/texera/amber/core/workflow.proto | 5 + .../amber/core/storage/DocumentFactory.scala | 29 +++++ .../amber/core/workflow/PhysicalOp.scala | 17 --- .../core/storage/DocumentFactorySpec.scala | 104 +++++++++++++++++ .../core/workflow/WorkflowCoreTypesSpec.scala | 8 -- 7 files changed, 152 insertions(+), 181 deletions(-) delete mode 100644 amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala create mode 100644 common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 1f3e563bca9..a8368c2421a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -65,38 +65,6 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.{Duration => ScalaDuration} -object RegionExecutionCoordinator { - - /** - * Decide whether to (re)create the output document at `uri`, then act. - * - * When `reuseExistingStorage` is set and the document already exists, the - * existing document is kept untouched -- this is how an operator whose - * region re-executes (e.g. LoopEnd, which accumulates output across loop - * iterations) avoids clobbering output an earlier run produced, since - * `createDocument` overrides any existing document. Otherwise the document - * is created. - * - * `documentExists` / `createDocument` are injected so the create-or-reuse - * decision can be unit-tested without an iceberg backend or a live region. - * - * @return true iff `createDocument` was invoked. - */ - def provisionOutputDocument( - uri: URI, - reuseExistingStorage: Boolean, - documentExists: URI => Boolean, - createDocument: URI => Unit - ): Boolean = { - if (reuseExistingStorage && documentExists(uri)) { - false - } else { - createDocument(uri) - true - } - } -} - /** * The executor of a region. * @@ -609,28 +577,23 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - // Operators that reuse their output storage across region re-runs - // (e.g. LoopEnd, whose output accumulates across the iterations of its - // own loop) already have their result/state documents from a prior - // run; on re-execution `createDocument` (overrideIfExists=true) would - // clobber them, so reuse the existing document when it is already - // there. (The inner LoopEnd of a nested loop additionally drops its - // output once per outer iteration -- on the Python worker side in - // MainLoop._process_state_frame -- which is orthogonal to this - // region-provisioning reuse.) - // Decided per the operator that OWNS this port, not region-wide: a - // region mixing a reuse op (LoopEnd) with others must still recreate - // the others' documents on re-execution. + // An output port whose storage accumulates across region re-executions + // (e.g. a LoopEnd port, whose output builds up over the iterations of + // its own loop) sets `reusesOutputStorage`, so the existing document is + // preserved rather than clobbered by `createDocument` (overrideIfExists + // = true). Read per output port -- storage behavior is port-specific. + // (The inner LoopEnd of a nested loop additionally drops its output + // once per outer iteration on the Python worker side in + // MainLoop._process_state_frame, which is orthogonal to this.) val reusesOutputStorage = - region.getOperator(outputPortId.opId).reusesOutputStorageOnReExecution + region + .getOperator(outputPortId.opId) + .outputPorts(outputPortId.portId) + ._1 + .reusesOutputStorage Seq((resultURI, schema), (stateURI, State.schema)).foreach { case (uri, sch) => - RegionExecutionCoordinator.provisionOutputDocument( - uri, - reusesOutputStorage, - DocumentFactory.documentExists, - u => DocumentFactory.createDocument(u, sch) - ) + DocumentFactory.createOrReuseDocument(uri, sch, reusesOutputStorage) } if (!isRestart) { val (_, eid, _, _) = decodeURI(resultURI) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala deleted file mode 100644 index 8fda6ffcd4c..00000000000 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.texera.amber.engine.architecture.scheduling - -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -import java.net.URI -import scala.collection.mutable - -/** - * Unit tests for `RegionExecutionCoordinator.provisionOutputDocument`, the - * create-or-reuse decision behind output-port storage provisioning. - * - * This is the branch that lets a re-executing region (a loop body) keep the - * output an earlier run accumulated instead of clobbering it: a LoopEnd's - * region runs once per iteration, and `DocumentFactory.createDocument` - * overrides any existing document, so on a re-run we must reuse the existing - * document rather than recreate it. - * - * The decision was pulled out of the private `createOutputPortStorageObjects` - * (which needs a live controller + iceberg backend) into a pure function with - * injected `documentExists` / `createDocument`, so the four cases can be - * pinned directly with a spy -- no iceberg, no actor system. - */ -class RegionOutputProvisioningSpec extends AnyFlatSpec with Matchers { - - private val uri = new URI("vfs:///wf/result/loop-end") - - /** Run provisionOutputDocument and return (created?, number of create calls). */ - private def provision( - reuseExistingStorage: Boolean, - exists: Boolean - ): (Boolean, Int) = { - val createCalls = mutable.ArrayBuffer.empty[URI] - val created = RegionExecutionCoordinator.provisionOutputDocument( - uri, - reuseExistingStorage, - _ => exists, - u => { createCalls += u; () } - ) - (created, createCalls.size) - } - - "provisionOutputDocument" should - "reuse (not recreate) an existing document when the operator reuses storage" in { - // The loop-iteration case: the document is already there from a prior - // region run, so createDocument must NOT be called -- otherwise the - // accumulated output would be clobbered. - val (created, createCalls) = provision(reuseExistingStorage = true, exists = true) - created shouldBe false - createCalls shouldBe 0 - } - - it should "create the document when the operator reuses storage but none exists yet" in { - // First iteration: nothing to reuse, so it must be created. - val (created, createCalls) = provision(reuseExistingStorage = true, exists = false) - created shouldBe true - createCalls shouldBe 1 - } - - it should "always (re)create when the operator does not reuse storage, even if a document exists" in { - // Non-loop operators get a fresh document every region execution; an - // existing one is intentionally overwritten. - val (created, createCalls) = provision(reuseExistingStorage = false, exists = true) - created shouldBe true - createCalls shouldBe 1 - } - - it should "create when the operator does not reuse storage and none exists" in { - val (created, createCalls) = provision(reuseExistingStorage = false, exists = false) - created shouldBe true - createCalls shouldBe 1 - } - - it should "not call documentExists when the operator does not reuse storage (create unconditionally)" in { - // Short-circuit: a non-reuse operator always recreates, so it must not - // even probe for existence. - var existsProbed = false - RegionExecutionCoordinator.provisionOutputDocument( - uri, - reuseExistingStorage = false, - _ => { existsProbed = true; true }, - _ => () - ) - existsProbed shouldBe false - } -} diff --git a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto index c5b2cb248ff..e5245fd43cc 100644 --- a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto +++ b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto @@ -62,6 +62,11 @@ message OutputPort { string displayName = 2; bool blocking = 3; OutputMode mode = 4; + // Whether storage at this port is reused (kept and appended to) rather than + // recreated when the owning operator's region re-executes -- e.g. a LoopEnd + // port whose output accumulates across the iterations of its own loop. The + // region scheduler reads this when provisioning the port's output document. + bool reusesOutputStorage = 5; } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index d3fcae868f6..e7626c2baa7 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -133,6 +133,35 @@ object DocumentFactory { } } + /** + * Create the document at `uri`, unless `reuseExisting` is set and a document + * already exists there -- in which case the existing document is kept + * untouched. This lets a caller whose output accumulates across re-runs + * (e.g. a LoopEnd port whose region re-executes once per loop iteration) + * preserve the already-populated document instead of clobbering it, since + * `createDocument` overrides any existing document. + * + * `exists` / `create` default to this object's own `documentExists` / + * `createDocument`; they are parameterized only so the create-or-reuse + * decision can be unit-tested without an iceberg backend. + * + * @return true iff a document was (re)created. + */ + def createOrReuseDocument( + uri: URI, + schema: Schema, + reuseExisting: Boolean, + exists: URI => Boolean = documentExists, + create: (URI, Schema) => Unit = (u, s) => { createDocument(u, s); () } + ): Boolean = { + if (reuseExisting && exists(uri)) { + false + } else { + create(uri, schema) + true + } + } + /** * Open a document specified by the uri. * If the document is storing structural data, the schema will also be returned diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala index ddaae82e48e..44125045c97 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala @@ -198,12 +198,6 @@ case class PhysicalOp( // schema propagation function propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), isOneToManyOp: Boolean = false, - // Whether to reuse this operator's existing output storage instead of - // recreating it when its region re-executes, so output accumulated by - // earlier runs (e.g. across loop iterations) survives. Named after the - // behavior the scheduler checks, not the operator that sets it, so any - // future operator needing the same treatment can reuse it. - reusesOutputStorageOnReExecution: Boolean = false, // hint for number of workers suggestedWorkerNum: Option[Int] = None, // name of the PVE to execute within @@ -322,17 +316,6 @@ case class PhysicalOp( def withIsOneToManyOp(isOneToManyOp: Boolean): PhysicalOp = this.copy(isOneToManyOp = isOneToManyOp) - /** - * Creates a copy specifying whether this operator's output storage is - * reused rather than recreated when its region re-executes (see the field - * doc). The region scheduler uses it to preserve iceberg output across - * loop iterations instead of overwriting it on every region invocation. - */ - def withReusesOutputStorageOnReExecution( - reusesOutputStorageOnReExecution: Boolean - ): PhysicalOp = - this.copy(reusesOutputStorageOnReExecution = reusesOutputStorageOnReExecution) - /** * Creates a copy of the PhysicalOp with the schema of a specified input port updated. * The schema can either be a successful schema definition or an error represented as a Throwable. diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala new file mode 100644 index 00000000000..e522a9b9750 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage + +import org.apache.texera.amber.core.tuple.Schema +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.net.URI +import scala.collection.mutable + +/** + * Unit tests for `DocumentFactory.createOrReuseDocument`, the create-or-reuse + * decision behind output-port storage provisioning. + * + * This is the branch that lets a re-executing region (a loop body) keep the + * output an earlier run accumulated instead of clobbering it: a LoopEnd port's + * region runs once per iteration, and `createDocument` overrides any existing + * document, so on a re-run the existing document must be reused rather than + * recreated. + * + * `exists` / `create` are injected so the four cases can be pinned directly + * with a spy -- no iceberg backend, no live region. + */ +class DocumentFactorySpec extends AnyFlatSpec with Matchers { + + private val uri = new URI("vfs:///wf/result/loop-end") + private val schema = Schema() + + /** Run createOrReuseDocument with a spy and return (created?, #create calls). */ + private def provision(reuseExisting: Boolean, exists: Boolean): (Boolean, Int) = { + val createCalls = mutable.ArrayBuffer.empty[URI] + val created = DocumentFactory.createOrReuseDocument( + uri, + schema, + reuseExisting, + _ => exists, + (u, _) => { createCalls += u; () } + ) + (created, createCalls.size) + } + + "createOrReuseDocument" should + "reuse (not recreate) an existing document when the port reuses storage" in { + // The loop-iteration case: the document is already there from a prior + // region run, so it must NOT be recreated -- otherwise the accumulated + // output would be clobbered. + val (created, createCalls) = provision(reuseExisting = true, exists = true) + created shouldBe false + createCalls shouldBe 0 + } + + it should "create the document when the port reuses storage but none exists yet" in { + // First iteration: nothing to reuse, so it must be created. + val (created, createCalls) = provision(reuseExisting = true, exists = false) + created shouldBe true + createCalls shouldBe 1 + } + + it should "always (re)create when the port does not reuse storage, even if a document exists" in { + // Non-reuse ports get a fresh document every region execution; an existing + // one is intentionally overwritten. + val (created, createCalls) = provision(reuseExisting = false, exists = true) + created shouldBe true + createCalls shouldBe 1 + } + + it should "create when the port does not reuse storage and none exists" in { + val (created, createCalls) = provision(reuseExisting = false, exists = false) + created shouldBe true + createCalls shouldBe 1 + } + + it should "not probe existence when the port does not reuse storage (create unconditionally)" in { + // Short-circuit: a non-reuse port always recreates, so it must not even + // probe for existence. + var existsProbed = false + DocumentFactory.createOrReuseDocument( + uri, + schema, + reuseExisting = false, + _ => { existsProbed = true; true }, + (_, _) => () + ) + existsProbed shouldBe false + } +} diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala index 20fe4a9b872..11f73013bf8 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala @@ -142,14 +142,6 @@ class WorkflowCoreTypesSpec extends AnyFlatSpec { assert(op.parallelizable, "the original instance is immutable") } - "PhysicalOp.withReusesOutputStorageOnReExecution" should "default to false and round-trip through copy" in { - val op = newPhysicalOp("a") - assert(!op.reusesOutputStorageOnReExecution, "defaults to false") - val flipped = op.withReusesOutputStorageOnReExecution(true) - assert(flipped.reusesOutputStorageOnReExecution) - assert(!op.reusesOutputStorageOnReExecution, "the original instance is immutable") - } - "PhysicalOp.withSuggestedWorkerNum" should "set the suggested worker count" in { val op = newPhysicalOp("a").withSuggestedWorkerNum(7) assert(op.suggestedWorkerNum.contains(7)) From 3933fad3d3d524fa647bbabbb28622fb768aba52 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 14 Jun 2026 17:16:07 -0700 Subject: [PATCH 4/6] style(scheduling): drop now-unused java.net.URI import Moving the create-or-reuse decision out of RegionExecutionCoordinator (into DocumentFactory) removed the only use of `java.net.URI` here; scalafix RemoveUnused flagged the leftover import in CI. Drop it. --- .../architecture/scheduling/RegionExecutionCoordinator.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index a8368c2421a..5f700cb5a3d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -60,7 +60,6 @@ import org.apache.texera.web.SessionState import org.apache.texera.web.model.websocket.event.RegionStateEvent import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource -import java.net.URI import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.{Duration => ScalaDuration} From 6f85e6bed0c98f195051152460b5a5bc2d9dedf2 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 14 Jun 2026 20:47:21 -0700 Subject: [PATCH 5/6] test(workflow-operator): guard that no operator enables reusesOutputStorage Per @Yicong-Huang's review on apache/texera#5707: add a sanity check that iterates every registered operator (OperatorMetadataGenerator.operatorTypeMap) and asserts no output port sets reusesOutputStorage. No operator needs it yet -- Loop End, the only one that will, is not on main -- so this catches an unexpected/accidental enablement. To be updated to allow Loop End's output port (and only it) when the loop operators land. --- .../metadata/OutputPortReuseFlagSpec.scala | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala new file mode 100644 index 00000000000..491b5782529 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.metadata + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +/** + * Guard for the `OutputPort.reusesOutputStorage` flag. + * + * The flag tells the region scheduler to reuse (append to) a port's storage + * across region re-executions instead of recreating it. Only an operator whose + * output accumulates across re-executions should set it -- today that is no + * operator on `main` (the only one that will, Loop End, is not yet merged). + * + * This pins the flag off for every registered operator so it can't be turned + * on unexpectedly. When the loop operators land, update this to allow Loop + * End's output port (and only it). + */ +class OutputPortReuseFlagSpec extends AnyFlatSpec with Matchers { + + "No registered operator" should "enable OutputPort.reusesOutputStorage on any of its output ports" in { + OperatorMetadataGenerator.operatorTypeMap.keys.foreach { opClass => + opClass.getConstructor().newInstance().operatorInfo.outputPorts.foreach { port => + withClue(s"${opClass.getSimpleName} / output port ${port.id}: ") { + port.reusesOutputStorage shouldBe false + } + } + } + } +} From 5690bd5148bcbb029480cf39f012d80718599cf3 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 14 Jun 2026 22:37:12 -0700 Subject: [PATCH 6/6] refactor(storage): createOrReuseDocument returns the document + guard the flag Address @Yicong-Huang's review on apache/texera#5707: - DocumentFactory.createOrReuseDocument now returns the VirtualDocument (opened when reused, created otherwise) instead of a Boolean, so the call site need not branch on create-vs-reuse. - RegionExecutionCoordinator adds a runtime require() that no output port sets reusesOutputStorage -- a production guard, since the flag only activates with the loop operators (not on main). Remove/relax it when introducing them. --- .../RegionExecutionCoordinator.scala | 9 ++ .../amber/core/storage/DocumentFactory.scala | 34 +++---- .../core/storage/DocumentFactorySpec.scala | 88 +++++++++---------- 3 files changed, 63 insertions(+), 68 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5f700cb5a3d..f78194bcd7f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -590,6 +590,15 @@ class RegionExecutionCoordinator( .outputPorts(outputPortId.portId) ._1 .reusesOutputStorage + // Guard: no operator enables reusesOutputStorage in production yet -- it + // activates with the loop operators, which aren't on main. Fail loudly + // if one does rather than silently exercising the dormant reuse path. + // Remove/relax this guard when introducing the loop operators. + require( + !reusesOutputStorage, + s"Output port $outputPortId set reusesOutputStorage, which is not " + + "supported in production yet (it activates with the loop operators)." + ) Seq((resultURI, schema), (stateURI, State.schema)).foreach { case (uri, sch) => DocumentFactory.createOrReuseDocument(uri, sch, reusesOutputStorage) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index e7626c2baa7..c879f56c36e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -134,33 +134,27 @@ object DocumentFactory { } /** - * Create the document at `uri`, unless `reuseExisting` is set and a document - * already exists there -- in which case the existing document is kept - * untouched. This lets a caller whose output accumulates across re-runs - * (e.g. a LoopEnd port whose region re-executes once per loop iteration) - * preserve the already-populated document instead of clobbering it, since - * `createDocument` overrides any existing document. + * Return the document at `uri`: when `reuseExisting` is set and a document + * already exists there, open and return the existing one -- so a caller whose + * output accumulates across re-runs (e.g. a LoopEnd port whose region + * re-executes once per loop iteration) keeps the already-populated document + * instead of clobbering it, since `createDocument` overrides any existing + * document. Otherwise create it. Either way the caller gets the document, so + * the call site need not branch on create-vs-reuse. * - * `exists` / `create` default to this object's own `documentExists` / - * `createDocument`; they are parameterized only so the create-or-reuse - * decision can be unit-tested without an iceberg backend. - * - * @return true iff a document was (re)created. + * `exists` / `open` / `create` default to this object's own `documentExists` + * / `openDocument` / `createDocument`; they are parameterized only so the + * create-or-reuse decision can be unit-tested without an iceberg backend. */ def createOrReuseDocument( uri: URI, schema: Schema, reuseExisting: Boolean, exists: URI => Boolean = documentExists, - create: (URI, Schema) => Unit = (u, s) => { createDocument(u, s); () } - ): Boolean = { - if (reuseExisting && exists(uri)) { - false - } else { - create(uri, schema) - true - } - } + open: URI => VirtualDocument[_] = (u: URI) => openDocument(u)._1, + create: (URI, Schema) => VirtualDocument[_] = createDocument + ): VirtualDocument[_] = + if (reuseExisting && exists(uri)) open(uri) else create(uri, schema) /** * Open a document specified by the uri. diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala index e522a9b9750..d67b90f3637 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala @@ -19,86 +19,78 @@ package org.apache.texera.amber.core.storage -import org.apache.texera.amber.core.tuple.Schema +import org.apache.texera.amber.core.storage.model.VirtualDocument +import org.apache.texera.amber.core.tuple.{Schema, Tuple} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import java.net.URI -import scala.collection.mutable /** * Unit tests for `DocumentFactory.createOrReuseDocument`, the create-or-reuse - * decision behind output-port storage provisioning. + * decision behind output-port storage provisioning. It always returns the + * document (opened when reused, created otherwise) so the call site doesn't + * branch. * - * This is the branch that lets a re-executing region (a loop body) keep the - * output an earlier run accumulated instead of clobbering it: a LoopEnd port's - * region runs once per iteration, and `createDocument` overrides any existing - * document, so on a re-run the existing document must be reused rather than - * recreated. - * - * `exists` / `create` are injected so the four cases can be pinned directly - * with a spy -- no iceberg backend, no live region. + * `exists` / `open` / `create` are injected so the decision can be pinned with + * trivial document stubs -- no iceberg backend, no live region. */ class DocumentFactorySpec extends AnyFlatSpec with Matchers { private val uri = new URI("vfs:///wf/result/loop-end") private val schema = Schema() - /** Run createOrReuseDocument with a spy and return (created?, #create calls). */ - private def provision(reuseExisting: Boolean, exists: Boolean): (Boolean, Int) = { - val createCalls = mutable.ArrayBuffer.empty[URI] - val created = DocumentFactory.createOrReuseDocument( + private def stubDoc: VirtualDocument[Tuple] = + new VirtualDocument[Tuple] { + override def getURI: URI = uri + override def clear(): Unit = () + } + private val opened: VirtualDocument[_] = stubDoc + private val created: VirtualDocument[_] = stubDoc + + /** Run with spies; return (document handed back, which path was taken). */ + private def run(reuseExisting: Boolean, exists: Boolean): (VirtualDocument[_], String) = { + var path = "" + val doc = DocumentFactory.createOrReuseDocument( uri, schema, reuseExisting, _ => exists, - (u, _) => { createCalls += u; () } + _ => { path = "open"; opened }, + (_, _) => { path = "create"; created } ) - (created, createCalls.size) + (doc, path) } "createOrReuseDocument" should - "reuse (not recreate) an existing document when the port reuses storage" in { - // The loop-iteration case: the document is already there from a prior - // region run, so it must NOT be recreated -- otherwise the accumulated - // output would be clobbered. - val (created, createCalls) = provision(reuseExisting = true, exists = true) - created shouldBe false - createCalls shouldBe 0 - } - - it should "create the document when the port reuses storage but none exists yet" in { - // First iteration: nothing to reuse, so it must be created. - val (created, createCalls) = provision(reuseExisting = true, exists = false) - created shouldBe true - createCalls shouldBe 1 + "open and return the existing document when the port reuses storage and one exists" in { + val (doc, path) = run(reuseExisting = true, exists = true) + path shouldBe "open" + doc should be theSameInstanceAs opened } - it should "always (re)create when the port does not reuse storage, even if a document exists" in { - // Non-reuse ports get a fresh document every region execution; an existing - // one is intentionally overwritten. - val (created, createCalls) = provision(reuseExisting = false, exists = true) - created shouldBe true - createCalls shouldBe 1 + it should "create when the port reuses storage but none exists yet" in { + val (doc, path) = run(reuseExisting = true, exists = false) + path shouldBe "create" + doc should be theSameInstanceAs created } - it should "create when the port does not reuse storage and none exists" in { - val (created, createCalls) = provision(reuseExisting = false, exists = false) - created shouldBe true - createCalls shouldBe 1 + it should "always create when the port does not reuse storage, even if one exists" in { + val (doc, path) = run(reuseExisting = false, exists = true) + path shouldBe "create" + doc should be theSameInstanceAs created } - it should "not probe existence when the port does not reuse storage (create unconditionally)" in { - // Short-circuit: a non-reuse port always recreates, so it must not even - // probe for existence. - var existsProbed = false + it should "not probe existence when the port does not reuse storage" in { + var probed = false DocumentFactory.createOrReuseDocument( uri, schema, reuseExisting = false, - _ => { existsProbed = true; true }, - (_, _) => () + _ => { probed = true; true }, + _ => opened, + (_, _) => created ) - existsProbed shouldBe false + probed shouldBe false } }