From 1511055bec5991a1462c5b6f6d3cc2474ea98ce8 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 25 Jun 2026 16:52:18 +0800 Subject: [PATCH 1/2] Add reset cursor latency metric --- conf/broker.conf | 4 ++- conf/standalone.conf | 4 ++- .../terraform-ansible/templates/broker.conf | 4 ++- .../pulsar/broker/ServiceConfiguration.java | 5 ++-- .../pulsar/broker/service/ServerCnx.java | 18 ++++++++++--- .../persistent/PersistentSubscription.java | 27 ++++++++++++++++++- 6 files changed, 52 insertions(+), 10 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 4826bbeb5c78a..903c33b68a7fe 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1558,7 +1558,9 @@ lazyCursorRecovery=false managedLedgerMetadataOperationsTimeoutSeconds=60 # Read entries timeout when broker tries to read messages from bookkeeper. -managedLedgerReadEntryTimeoutSeconds=0 +# Keep this enabled to prevent stuck read operations from blocking managed cursor operations such as reset cursor. +# Set to 0 to disable it. +managedLedgerReadEntryTimeoutSeconds=120 # Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). managedLedgerAddEntryTimeoutSeconds=0 diff --git a/conf/standalone.conf b/conf/standalone.conf index 2dba365a5a7a2..a524dfc609955 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1033,7 +1033,9 @@ autoSkipNonRecoverableData=false managedLedgerMetadataOperationsTimeoutSeconds=60 # Read entries timeout when broker tries to read messages from bookkeeper. -managedLedgerReadEntryTimeoutSeconds=0 +# Keep this enabled to prevent stuck read operations from blocking managed cursor operations such as reset cursor. +# Set to 0 to disable it. +managedLedgerReadEntryTimeoutSeconds=120 # Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). managedLedgerAddEntryTimeoutSeconds=0 diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 1ccfbacecb04a..ca047ee6ef11a 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -845,7 +845,9 @@ lazyCursorRecovery=false managedLedgerMetadataOperationsTimeoutSeconds=60 # Read entries timeout when broker tries to read messages from bookkeeper. -managedLedgerReadEntryTimeoutSeconds=0 +# Keep this enabled to prevent stuck read operations from blocking managed cursor operations such as reset cursor. +# Set to 0 to disable it. +managedLedgerReadEntryTimeoutSeconds=120 # Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it). managedLedgerAddEntryTimeoutSeconds=0 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 48be38ad80941..03d540fab4379 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2632,9 +2632,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Read entries timeout when broker tries to read messages from bookkeeper " - + "(0 to disable it)" + + "(0 to disable it). It is recommended to keep this enabled to prevent stuck read operations " + + "from blocking managed cursor operations such as reset cursor." ) - private long managedLedgerReadEntryTimeoutSeconds = 0; + private long managedLedgerReadEntryTimeoutSeconds = 120; @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 7dff4260d2333..c9a22b148b831 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2833,12 +2833,13 @@ protected void handleSeek(CommandSeek seek) { .log("Reset subscription to message id"); commandSender.sendSuccessResponse(requestId); }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); log.warn() .attr("subscription", subscription) - .exception(ex) + .exception(cause) .log("Failed to reset subscription"); commandSender.sendErrorResponse(requestId, ServerError.UnknownError, - "Error when resetting subscription: " + ex.getCause().getMessage()); + "Error when resetting subscription: " + getExceptionMessage(cause)); return null; }); } else if (consumerCreated && seek.hasMessagePublishTime()){ @@ -2854,12 +2855,13 @@ protected void handleSeek(CommandSeek seek) { .log("Reset subscription to publish time"); commandSender.sendSuccessResponse(requestId); }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); log.warn() .attr("subscription", subscription) - .exception(ex) + .exception(cause) .log("Failed to reset subscription"); commandSender.sendErrorResponse(requestId, ServerError.UnknownError, - "Reset subscription to publish time error: " + ex.getCause().getMessage()); + "Reset subscription to publish time error: " + getExceptionMessage(cause)); return null; }); } else { @@ -2867,6 +2869,14 @@ protected void handleSeek(CommandSeek seek) { } } + private static String getExceptionMessage(Throwable throwable) { + if (throwable == null) { + return "unknown error"; + } + String message = throwable.getMessage(); + return message != null ? message : throwable.getClass().getName(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 236a55162347b..37c63c6be07f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -32,6 +32,7 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -74,6 +75,7 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; @@ -97,6 +99,16 @@ public class PersistentSubscription extends AbstractSubscription { private static final Logger LOG = Logger.get(PersistentSubscription.class); + private static final String RESET_CURSOR_RESULT_SUCCESS = "success"; + private static final String RESET_CURSOR_RESULT_FAILURE = "failure"; + private static final Summary RESET_CURSOR_LATENCY = Summary + .build("pulsar_subscription_reset_cursor_latency_ms", "-") + .labelNames("topic", "subscription", "result") + .quantile(0.5) + .quantile(0.95) + .quantile(0.99) + .quantile(0.999) + .register(); protected final Logger log; protected final PersistentTopic topic; @@ -870,7 +882,9 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { @Override public CompletableFuture resetCursor(long timestamp) { + long startTimeNs = System.nanoTime(); if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { + recordResetCursorLatency(startTimeNs, RESET_CURSOR_RESULT_FAILURE); return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription")); } @@ -928,13 +942,24 @@ public void findEntryFailed(ManagedLedgerException exception, } }); + future.whenComplete((__, ex) -> recordResetCursorLatency(startTimeNs, + ex == null ? RESET_CURSOR_RESULT_SUCCESS : RESET_CURSOR_RESULT_FAILURE)); return future; } @Override public CompletableFuture resetCursor(Position finalPosition) { + long startTimeNs = System.nanoTime(); final CompletableFuture future = new CompletableFuture<>(); - return resetCursorInternal(finalPosition, future, false); + CompletableFuture resetCursorFuture = resetCursorInternal(finalPosition, future, false); + resetCursorFuture.whenComplete((__, ex) -> recordResetCursorLatency(startTimeNs, + ex == null ? RESET_CURSOR_RESULT_SUCCESS : RESET_CURSOR_RESULT_FAILURE)); + return resetCursorFuture; + } + + private void recordResetCursorLatency(long startTimeNs, String result) { + RESET_CURSOR_LATENCY.labels(topicName, subName, result) + .observe(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); } private CompletableFuture resetCursorInternal(Position finalPosition, CompletableFuture future, From d3c6e5c05d1a8526bf52bdfe1dcfa0ac442d08d9 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 25 Jun 2026 18:08:41 +0800 Subject: [PATCH 2/2] Handle null unwrapped seek errors --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c9a22b148b831..76107589db0d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2833,7 +2833,7 @@ protected void handleSeek(CommandSeek seek) { .log("Reset subscription to message id"); commandSender.sendSuccessResponse(requestId); }).exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); + Throwable cause = unwrapCompletionExceptionOrReturnOriginal(ex); log.warn() .attr("subscription", subscription) .exception(cause) @@ -2855,7 +2855,7 @@ protected void handleSeek(CommandSeek seek) { .log("Reset subscription to publish time"); commandSender.sendSuccessResponse(requestId); }).exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); + Throwable cause = unwrapCompletionExceptionOrReturnOriginal(ex); log.warn() .attr("subscription", subscription) .exception(cause) @@ -2869,6 +2869,11 @@ protected void handleSeek(CommandSeek seek) { } } + private static Throwable unwrapCompletionExceptionOrReturnOriginal(Throwable throwable) { + Throwable cause = FutureUtil.unwrapCompletionException(throwable); + return cause != null ? cause : throwable; + } + private static String getExceptionMessage(Throwable throwable) { if (throwable == null) { return "unknown error";