Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,9 @@ lazyCursorRecovery=false
managedLedgerMetadataOperationsTimeoutSeconds=60

# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=0
# Keep this enabled to prevent stuck read operations from blocking managed cursor operations such as reset cursor.
# Set to 0 to disable it.
managedLedgerReadEntryTimeoutSeconds=120

# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0
Expand Down
4 changes: 3 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,9 @@ autoSkipNonRecoverableData=false
managedLedgerMetadataOperationsTimeoutSeconds=60

# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=0
# Keep this enabled to prevent stuck read operations from blocking managed cursor operations such as reset cursor.
# Set to 0 to disable it.
managedLedgerReadEntryTimeoutSeconds=120

# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0
Expand Down
4 changes: 3 additions & 1 deletion deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,9 @@ lazyCursorRecovery=false
managedLedgerMetadataOperationsTimeoutSeconds=60

# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=0
# Keep this enabled to prevent stuck read operations from blocking managed cursor operations such as reset cursor.
# Set to 0 to disable it.
managedLedgerReadEntryTimeoutSeconds=120

# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2632,9 +2632,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Read entries timeout when broker tries to read messages from bookkeeper "
+ "(0 to disable it)"
+ "(0 to disable it). It is recommended to keep this enabled to prevent stuck read operations "
+ "from blocking managed cursor operations such as reset cursor."
)
private long managedLedgerReadEntryTimeoutSeconds = 0;
private long managedLedgerReadEntryTimeoutSeconds = 120;

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2833,12 +2833,13 @@ protected void handleSeek(CommandSeek seek) {
.log("Reset subscription to message id");
commandSender.sendSuccessResponse(requestId);
}).exceptionally(ex -> {
Throwable cause = unwrapCompletionExceptionOrReturnOriginal(ex);
log.warn()
.attr("subscription", subscription)
.exception(ex)
.exception(cause)
.log("Failed to reset subscription");
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Error when resetting subscription: " + ex.getCause().getMessage());
"Error when resetting subscription: " + getExceptionMessage(cause));
return null;
});
} else if (consumerCreated && seek.hasMessagePublishTime()){
Expand All @@ -2854,19 +2855,33 @@ protected void handleSeek(CommandSeek seek) {
.log("Reset subscription to publish time");
commandSender.sendSuccessResponse(requestId);
}).exceptionally(ex -> {
Throwable cause = unwrapCompletionExceptionOrReturnOriginal(ex);
log.warn()
.attr("subscription", subscription)
.exception(ex)
.exception(cause)
.log("Failed to reset subscription");
commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
"Reset subscription to publish time error: " + ex.getCause().getMessage());
"Reset subscription to publish time error: " + getExceptionMessage(cause));
return null;
});
} else {
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
}
}

private static Throwable unwrapCompletionExceptionOrReturnOriginal(Throwable throwable) {
Throwable cause = FutureUtil.unwrapCompletionException(throwable);
return cause != null ? cause : throwable;
}

private static String getExceptionMessage(Throwable throwable) {
if (throwable == null) {
return "unknown error";
}
String message = throwable.getMessage();
return message != null ? message : throwable.getClass().getName();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
Expand All @@ -97,6 +99,16 @@
public class PersistentSubscription extends AbstractSubscription {

private static final Logger LOG = Logger.get(PersistentSubscription.class);
private static final String RESET_CURSOR_RESULT_SUCCESS = "success";
private static final String RESET_CURSOR_RESULT_FAILURE = "failure";
private static final Summary RESET_CURSOR_LATENCY = Summary
.build("pulsar_subscription_reset_cursor_latency_ms", "-")
.labelNames("topic", "subscription", "result")
.quantile(0.5)
.quantile(0.95)
.quantile(0.99)
.quantile(0.999)
.register();
protected final Logger log;

protected final PersistentTopic topic;
Expand Down Expand Up @@ -870,7 +882,9 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {

@Override
public CompletableFuture<Void> resetCursor(long timestamp) {
long startTimeNs = System.nanoTime();
if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
recordResetCursorLatency(startTimeNs, RESET_CURSOR_RESULT_FAILURE);
return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription"));
}

Expand Down Expand Up @@ -928,13 +942,24 @@ public void findEntryFailed(ManagedLedgerException exception,
}
});

future.whenComplete((__, ex) -> recordResetCursorLatency(startTimeNs,
ex == null ? RESET_CURSOR_RESULT_SUCCESS : RESET_CURSOR_RESULT_FAILURE));
return future;
}

@Override
public CompletableFuture<Void> resetCursor(Position finalPosition) {
long startTimeNs = System.nanoTime();
final CompletableFuture<Void> future = new CompletableFuture<>();
return resetCursorInternal(finalPosition, future, false);
CompletableFuture<Void> resetCursorFuture = resetCursorInternal(finalPosition, future, false);
resetCursorFuture.whenComplete((__, ex) -> recordResetCursorLatency(startTimeNs,
ex == null ? RESET_CURSOR_RESULT_SUCCESS : RESET_CURSOR_RESULT_FAILURE));
return resetCursorFuture;
}

private void recordResetCursorLatency(long startTimeNs, String result) {
RESET_CURSOR_LATENCY.labels(topicName, subName, result)
.observe(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
}

private CompletableFuture<Void> resetCursorInternal(Position finalPosition, CompletableFuture<Void> future,
Expand Down
Loading