diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5a9df11b589..4497d7c4ae1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -576,8 +576,33 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - DocumentFactory.createDocument(resultURI, schema) - DocumentFactory.createDocument(stateURI, State.schema) + // An output port whose storage accumulates across region re-executions + // (e.g. a LoopEnd port, whose output builds up over the iterations of + // its own loop) sets `reuseStorage`. When set, the port's existing + // document is kept and reopened on each re-run; when unset, a fresh one + // is created. Read per output port -- storage behavior is port-specific. + // (The inner LoopEnd of a nested loop additionally drops its output + // once per outer iteration on the Python worker side in + // MainLoop._process_state_frame, which is orthogonal to this.) + val reuseStorage = + region + .getOperator(outputPortId.opId) + .outputPorts(outputPortId.portId) + ._1 + .reuseStorage + // Guard: no operator enables reuseStorage in production yet -- it + // activates with the loop operators, which aren't on main. Until then + // this fails loudly so the dormant reuse path is never silently + // exercised. Remove/relax this guard when introducing the loop operators. + require( + !reuseStorage, + s"Output port $outputPortId set reuseStorage, which is not " + + "supported in production yet (it activates with the loop operators)." + ) + Seq((resultURI, schema), (stateURI, State.schema)).foreach { + case (uri, sch) => + DocumentFactory.createOrReuseDocument(uri, sch, reuseStorage) + } if (!isRestart) { val (_, eid, _, _) = decodeURI(resultURI) WorkflowExecutionsResource.insertOperatorPortResultUri( diff --git a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto index c5b2cb248ff..b3b72ed9248 100644 --- a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto +++ b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto @@ -62,6 +62,12 @@ message OutputPort { string displayName = 2; bool blocking = 3; OutputMode mode = 4; + // Whether storage at this port persists across the owning operator's region + // re-executions: when set, the existing document is kept and appended to on + // each re-run; when unset, it is recreated. Set e.g. on a LoopEnd port whose + // output accumulates across the iterations of its own loop. The region + // scheduler reads this when provisioning the port's output document. + bool reuseStorage = 5; } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index d3fcae868f6..a26340e79cc 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -133,6 +133,28 @@ object DocumentFactory { } } + /** + * Return the document at `uri`: when `reuseExisting` is set and a document + * already exists there, open and return the existing one -- so a caller whose + * output accumulates across re-runs (e.g. a LoopEnd port whose region + * re-executes once per loop iteration) keeps the already-populated document + * instead of clobbering it, since `createDocument` overrides any existing + * document. Otherwise create it. + * + * `exists` / `open` / `create` default to this object's own `documentExists` + * / `openDocument` / `createDocument`; they are parameterized only so the + * create-or-reuse decision can be unit-tested without an iceberg backend. + */ + def createOrReuseDocument( + uri: URI, + schema: Schema, + reuseExisting: Boolean, + exists: URI => Boolean = documentExists, + open: URI => VirtualDocument[_] = (u: URI) => openDocument(u)._1, + create: (URI, Schema) => VirtualDocument[_] = createDocument + ): VirtualDocument[_] = + if (reuseExisting && exists(uri)) open(uri) else create(uri, schema) + /** * Open a document specified by the uri. * If the document is storing structural data, the schema will also be returned diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala new file mode 100644 index 00000000000..d67b90f3637 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage + +import org.apache.texera.amber.core.storage.model.VirtualDocument +import org.apache.texera.amber.core.tuple.{Schema, Tuple} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.net.URI + +/** + * Unit tests for `DocumentFactory.createOrReuseDocument`, the create-or-reuse + * decision behind output-port storage provisioning. It always returns the + * document (opened when reused, created otherwise) so the call site doesn't + * branch. + * + * `exists` / `open` / `create` are injected so the decision can be pinned with + * trivial document stubs -- no iceberg backend, no live region. + */ +class DocumentFactorySpec extends AnyFlatSpec with Matchers { + + private val uri = new URI("vfs:///wf/result/loop-end") + private val schema = Schema() + + private def stubDoc: VirtualDocument[Tuple] = + new VirtualDocument[Tuple] { + override def getURI: URI = uri + override def clear(): Unit = () + } + private val opened: VirtualDocument[_] = stubDoc + private val created: VirtualDocument[_] = stubDoc + + /** Run with spies; return (document handed back, which path was taken). */ + private def run(reuseExisting: Boolean, exists: Boolean): (VirtualDocument[_], String) = { + var path = "" + val doc = DocumentFactory.createOrReuseDocument( + uri, + schema, + reuseExisting, + _ => exists, + _ => { path = "open"; opened }, + (_, _) => { path = "create"; created } + ) + (doc, path) + } + + "createOrReuseDocument" should + "open and return the existing document when the port reuses storage and one exists" in { + val (doc, path) = run(reuseExisting = true, exists = true) + path shouldBe "open" + doc should be theSameInstanceAs opened + } + + it should "create when the port reuses storage but none exists yet" in { + val (doc, path) = run(reuseExisting = true, exists = false) + path shouldBe "create" + doc should be theSameInstanceAs created + } + + it should "always create when the port does not reuse storage, even if one exists" in { + val (doc, path) = run(reuseExisting = false, exists = true) + path shouldBe "create" + doc should be theSameInstanceAs created + } + + it should "not probe existence when the port does not reuse storage" in { + var probed = false + DocumentFactory.createOrReuseDocument( + uri, + schema, + reuseExisting = false, + _ => { probed = true; true }, + _ => opened, + (_, _) => created + ) + probed shouldBe false + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala new file mode 100644 index 00000000000..7850d2b98f1 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.metadata + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +/** + * Guard for the `OutputPort.reuseStorage` flag. + * + * The flag tells the region scheduler to reuse (append to) a port's storage + * across region re-executions instead of recreating it. Only an operator whose + * output accumulates across re-executions should set it -- today that is no + * operator on `main` (the only one that will, Loop End, is not yet merged). + * + * This pins the flag off for every registered operator so it can't be turned + * on unexpectedly. When the loop operators land, update this to allow Loop + * End's output port (and only it). + */ +class OutputPortReuseFlagSpec extends AnyFlatSpec with Matchers { + + "No registered operator" should "enable OutputPort.reuseStorage on any of its output ports" in { + OperatorMetadataGenerator.operatorTypeMap.keys.foreach { opClass => + opClass.getConstructor().newInstance().operatorInfo.outputPorts.foreach { port => + withClue(s"${opClass.getSimpleName} / output port ${port.id}: ") { + port.reuseStorage shouldBe false + } + } + } + } +}