From 0f36aaa920e314e3f011e63ec3247e5884c14b30 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 11 Jun 2026 12:07:52 -0700 Subject: [PATCH 1/4] fix: include port in computing unit pod URI `KubernetesClient.generatePodURI` builds the in-cluster address that is stored as the computing unit's `uri` (via `setUri` in `ComputingUnitManagingResource` and returned to clients as `nodeAddresses`). The pod's container listens on `KubernetesConfig.computeUnitPortNumber` (set via `withContainerPort`), but the generated URI omitted the port, so the stored address pointed at the default port instead of the one the computing unit actually serves on. Append the configured port so the persisted URI is directly connectable. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../scala/org/apache/texera/service/util/KubernetesClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala index cfc01b83b64..5078e73f019 100644 --- a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala +++ b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala @@ -35,7 +35,7 @@ object KubernetesClient { private val podNamePrefix = "computing-unit" def generatePodURI(cuid: Int): String = { - s"${generatePodName(cuid)}.${KubernetesConfig.computeUnitServiceName}.$namespace.svc.cluster.local" + s"${generatePodName(cuid)}.${KubernetesConfig.computeUnitServiceName}.$namespace.svc.cluster.local:${KubernetesConfig.computeUnitPortNumber}" } def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid" From c4e79078b53346082e817a3d3abda4a3f255cdf5 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 11 Jun 2026 13:46:23 -0700 Subject: [PATCH 2/4] fix: route computing units to their recorded pod URI The access-control service rebuilt the computing unit's in-cluster address from `KubernetesConfig` on every authorization request, which duplicates the address-construction logic already in `KubernetesClient.generatePodURI` and can drift from it (e.g. service name vs. pool-name conventions). Read the URI persisted for the unit (written by the managing service when the pod is created) and route to it directly, so the routing target comes from a single source of truth. Fall back to the previously constructed in-cluster address when no URI has been recorded for the unit yet. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../resource/AccessControlResource.scala | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala index 0c90a6ce31f..def90704466 100644 --- a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala +++ b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala @@ -28,7 +28,9 @@ import org.apache.texera.auth.JwtParser.parseToken import org.apache.texera.auth.SessionUser import org.apache.texera.auth.util.{ComputingUnitAccess, HeaderField} import org.apache.texera.config.{GuiConfig, KubernetesConfig, LLMConfig} +import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum +import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowComputingUnitDao import java.net.URLDecoder import java.nio.charset.StandardCharsets @@ -136,12 +138,27 @@ object AccessControlResource extends LazyLogging { } // Dynamic Routing Logic - val workflowComputingUnitPoolName = KubernetesConfig.computeUnitPoolName - val workflowComputingUnitPoolNamespace = KubernetesConfig.computeUnitPoolNamespace - val workflowComputingUnitPoolPort = KubernetesConfig.computeUnitPortNumber - - val targetHost = - s"computing-unit-$cuidInt.$workflowComputingUnitPoolName-svc.$workflowComputingUnitPoolNamespace.svc.cluster.local:$workflowComputingUnitPoolPort" + // Use the URI persisted for the computing unit (written by the managing + // service when the pod is created) as the routing target, so the address is + // resolved from a single source of truth instead of being reconstructed + // here. Fall back to the conventional in-cluster address if no URI has been + // recorded for the unit yet. + val cuDao = new WorkflowComputingUnitDao( + SqlServer.getInstance().createDSLContext().configuration() + ) + val unit = cuDao.fetchOneByCuid(cuidInt) + val recordedUri = Option(unit).flatMap(u => Option(u.getUri)).map(_.trim).filter(_.nonEmpty) + + val targetHost = recordedUri match { + case Some(uri) => + logger.info(s"Routing CU $cuidInt to recorded host: $uri") + uri + case None => + val workflowComputingUnitPoolName = KubernetesConfig.computeUnitPoolName + val workflowComputingUnitPoolNamespace = KubernetesConfig.computeUnitPoolNamespace + val workflowComputingUnitPoolPort = KubernetesConfig.computeUnitPortNumber + s"computing-unit-$cuidInt.$workflowComputingUnitPoolName-svc.$workflowComputingUnitPoolNamespace.svc.cluster.local:$workflowComputingUnitPoolPort" + } Response .ok() From 97ea514c455dc7bb8341a25016ce1ae51f58f97b Mon Sep 17 00:00:00 2001 From: ali Date: Sat, 13 Jun 2026 13:19:18 -0700 Subject: [PATCH 3/4] fix: refuse CU connection when no URI is recorded Per review, drop the in-cluster address fallback in the access-control service. A computing unit is routed only to the URI recorded for it; if no URI has been recorded the unit is not routable, so the authorization request is refused (403) instead of falling back to a reconstructed in-cluster address. Also drops the now-unused KubernetesConfig import. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../resource/AccessControlResource.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala index def90704466..119206fe67d 100644 --- a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala +++ b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala @@ -27,7 +27,7 @@ import jakarta.ws.rs.{Consumes, DELETE, GET, POST, Path, Produces} import org.apache.texera.auth.JwtParser.parseToken import org.apache.texera.auth.SessionUser import org.apache.texera.auth.util.{ComputingUnitAccess, HeaderField} -import org.apache.texera.config.{GuiConfig, KubernetesConfig, LLMConfig} +import org.apache.texera.config.{GuiConfig, LLMConfig} import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowComputingUnitDao @@ -138,11 +138,11 @@ object AccessControlResource extends LazyLogging { } // Dynamic Routing Logic - // Use the URI persisted for the computing unit (written by the managing - // service when the pod is created) as the routing target, so the address is - // resolved from a single source of truth instead of being reconstructed - // here. Fall back to the conventional in-cluster address if no URI has been - // recorded for the unit yet. + // Route to the URI recorded for the computing unit (written by the managing + // service when the pod is created). This recorded URI is the single source + // of truth for where the unit is reachable, allowing units to live anywhere + // the gateway can route to. If no URI has been recorded, the unit is not + // routable and the connection is refused. val cuDao = new WorkflowComputingUnitDao( SqlServer.getInstance().createDSLContext().configuration() ) @@ -154,10 +154,8 @@ object AccessControlResource extends LazyLogging { logger.info(s"Routing CU $cuidInt to recorded host: $uri") uri case None => - val workflowComputingUnitPoolName = KubernetesConfig.computeUnitPoolName - val workflowComputingUnitPoolNamespace = KubernetesConfig.computeUnitPoolNamespace - val workflowComputingUnitPoolPort = KubernetesConfig.computeUnitPortNumber - s"computing-unit-$cuidInt.$workflowComputingUnitPoolName-svc.$workflowComputingUnitPoolNamespace.svc.cluster.local:$workflowComputingUnitPoolPort" + logger.warn(s"Refusing CU $cuidInt: no URI recorded for the computing unit") + return Response.status(Response.Status.FORBIDDEN).build() } Response From e103ad93645a6da0a35311b4d883cc04feeb1cc7 Mon Sep 17 00:00:00 2001 From: ali Date: Mon, 15 Jun 2026 14:47:25 -0700 Subject: [PATCH 4/4] test(access-control): cover refuse-when-no-URI routing Add coverage for the dynamic routing logic in AccessControlResource: record a URI on the existing test computing unit and assert the rewritten Host header carries it, and add two computing units (no URI, blank URI) that the user can access but which are refused with FORBIDDEN. This also fixes the existing OK-path tests, which previously failed under the refuse-when-no-URI behavior because the test unit had no recorded URI. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../texera/AccessControlResourceSpec.scala | 65 +++++++++++++++++-- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala b/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala index 75f3bacb107..e1dbf89724c 100644 --- a/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala +++ b/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala @@ -55,6 +55,11 @@ class AccessControlResourceSpec private val testURI: String = "http://localhost:8080/" private val testPath: String = "/api/executions/1/stats/1" + // The host:port the managing service records for a computing unit when it + // creates the pod. The access-control-service routes to this recorded URI. + private val testRecordedUri: String = + "computing-unit-2.compute-unit-svc.default.svc.cluster.local:8888" + private val testUser1: User = { val user = new User() user.setUid(1) @@ -81,6 +86,31 @@ class AccessControlResourceSpec cu.setType(WorkflowComputingUnitTypeEnum.kubernetes) cu.setCuid(2) cu.setName("test-cu") + cu.setUri(testRecordedUri) + cu + } + + // A computing unit the user can access but for which no URI was ever recorded + // (e.g. the pod was never created). Such a unit is not routable and must be + // refused. + private val testCUNoUri: WorkflowComputingUnit = { + val cu = new WorkflowComputingUnit() + cu.setUid(2) + cu.setType(WorkflowComputingUnitTypeEnum.kubernetes) + cu.setCuid(3) + cu.setName("test-cu-no-uri") + cu + } + + // A computing unit whose recorded URI is blank/whitespace-only — also treated + // as "no URI recorded" and refused. + private val testCUBlankUri: WorkflowComputingUnit = { + val cu = new WorkflowComputingUnit() + cu.setUid(2) + cu.setType(WorkflowComputingUnitTypeEnum.kubernetes) + cu.setCuid(4) + cu.setName("test-cu-blank-uri") + cu.setUri(" ") cu } @@ -96,12 +126,18 @@ class AccessControlResourceSpec userDao.insert(testUser1) userDao.insert(testUser2) computingUnitDao.insert(testCU) - - val cuAccess = new ComputingUnitUserAccess() - cuAccess.setUid(testUser1.getUid) - cuAccess.setCuid(testCU.getCuid) - cuAccess.setPrivilege(PrivilegeEnum.WRITE) - computingUnitOfUserDao.insert(cuAccess) + computingUnitDao.insert(testCUNoUri) + computingUnitDao.insert(testCUBlankUri) + + // Grant testUser1 WRITE access to every test computing unit so the routing + // logic (not the access check) is what each routing test exercises. + Seq(testCU, testCUNoUri, testCUBlankUri).foreach { cu => + val cuAccess = new ComputingUnitUserAccess() + cuAccess.setUid(testUser1.getUid) + cuAccess.setCuid(cu.getCuid) + cuAccess.setPrivilege(PrivilegeEnum.WRITE) + computingUnitOfUserDao.insert(cuAccess) + } val claims = JwtAuth.jwtClaims(testUser1, 1) token = JwtAuth.jwtToken(claims) @@ -232,6 +268,23 @@ class AccessControlResourceSpec response.getHeaderString(HeaderField.UserId) shouldBe testUser1.getUid.toString response.getHeaderString(HeaderField.UserName) shouldBe testUser1.getName response.getHeaderString(HeaderField.UserEmail) shouldBe testUser1.getEmail + // Envoy routes by the rewritten Host header, which must be the URI recorded + // for the computing unit. + response.getHeaderString("Host") shouldBe testRecordedUri + } + + it should "refuse the connection when no URI is recorded for the computing unit" in { + val (uri, headers) = mockRequest(testPath, Some(testCUNoUri.getCuid.toString)) + val response = new AccessControlResource().authorizeGet(uri, headers) + + response.getStatus shouldBe Response.Status.FORBIDDEN.getStatusCode + } + + it should "refuse the connection when the recorded URI is blank" in { + val (uri, headers) = mockRequest(testPath, Some(testCUBlankUri.getCuid.toString)) + val response = new AccessControlResource().authorizeGet(uri, headers) + + response.getStatus shouldBe Response.Status.FORBIDDEN.getStatusCode } private def mockRequest(