Skip to content

[fix][broker] Prevent duplicate geo-replicated messages after target topic reload#25860

Open
void-ptr974 wants to merge 5 commits into
apache:masterfrom
void-ptr974:geo-dedup-core-fix
Open

[fix][broker] Prevent duplicate geo-replicated messages after target topic reload#25860
void-ptr974 wants to merge 5 commits into
apache:masterfrom
void-ptr974:geo-dedup-core-fix

Conversation

@void-ptr974

@void-ptr974 void-ptr974 commented May 23, 2026

Copy link
Copy Markdown
Contributor

Fixes #25861

Motivation

This fixes correctness issues in persistent geo-replication V2 deduplication. These issues are in the core replication data path and can produce duplicate messages on the target cluster during normal recovery/failover paths.

Geo-replication V2 deduplication uses the source topic position as the target-side dedup watermark. For a replicated message, the source replicator adds __MSG_PROP_REPL_SOURCE_POSITION=<source-ledger-id>:<source-entry-id>, and the target broker stores the latest replicated source position as the dedup watermark for that replicator producer.

This state is not normal producer sequence state. It is the target-side checkpoint used to identify whether a replayed source entry has already been persisted.

There are four related problems in the current handling of this watermark:

  1. The geo V2 watermark is not recovered from replayed target entries

    When deduplication is enabled or a target topic is loaded, the broker restores dedup state from the pulsar.dedup cursor snapshot and then replays entries after that snapshot.

    The existing replay path restores normal producer sequence ids from message metadata, but it does not rebuild geo V2 watermark state from replicated messages. If the target topic is unloaded after replicated messages are persisted but before the latest dedup snapshot includes their source positions, the in-memory geo watermark is lost.

    If the source replicator later reconnects and replays from its replication cursor, the target broker can fail to identify those source entries as duplicates and persist them again.

  2. The geo V2 watermark must be handled atomically

    A geo V2 watermark is a pair: source ledger id and source entry id. Treating the two values as independent mutable states can expose a mixed pair to duplicate checks or snapshots, for example a new ledger id combined with an old entry id.

    Such a mixed pair does not represent a real source position. If it is snapshotted and later recovered, the target can either move the watermark too far forward and drop valid replayed source entries, or move it backward and allow duplicates.

  3. Replay recovery must never move the watermark backward

    Source-position replay is expected during reconnects and failovers. When rebuilding the watermark from target entries, older source positions can appear after newer positions in recovery windows. The recovered watermark must therefore keep the maximum source position seen so far instead of blindly overwriting the current value.

  4. The geo V2 watermark can be removed by normal producer inactivity cleanup

    Normal producer dedup state can be purged after producer inactivity. Geo V2 watermark state has a different lifecycle: it is a source-position checkpoint for replication, not the lifecycle state of the replicator producer.

    The source replicator may disconnect, reconnect, or fail over and replay the same source entries from its replication cursor. If the target has purged the geo watermark because the replicator producer was inactive, those replayed source entries can be accepted and written again.

A concrete failure window is:

  1. Source replicator sends messages to the target.
  2. Target persists the messages and updates the in-memory geo V2 dedup watermark.
  3. The target topic is unloaded before the latest dedup snapshot includes the watermark.
  4. The source replicator has not durably advanced its replication cursor yet.
  5. Source reconnects and replays the same source entries.
  6. Target reloads without the geo watermark and can write duplicates.

This does not require client misuse or corrupted input. Source cursor replay is an expected recovery behavior, so target-side geo dedup state must survive topic reload and producer inactivity cleanup.

Modifications

  • Recover replication V2 dedup watermarks while replaying the dedup cursor by reading replicatedFrom and __MSG_PROP_REPL_SOURCE_POSITION from persisted replicated messages.
  • Track geo V2 watermarks in memory as an immutable source-position pair instead of two independently updated _LID / _EID map entries.
  • Use per-producer ConcurrentHashMap.compute for the geo V2 pushed watermark so duplicate checks and watermark advancement are atomic per replicator producer without taking a global lock.
  • Merge replayed and persisted watermarks by max source position so recovery and delayed callbacks cannot move the watermark backward.
  • Recover shadow-topic replication watermarks under the actual shadow producer name instead of deriving a geo-replication producer name from replicatedFrom.
  • Skip marker messages while rebuilding replication watermarks from replayed entries, matching the normal persist path.
  • Store replication watermarks as complete _LID / _EID pairs in dedup snapshots. Replication watermarks are expected to be small in number, so the snapshot keeps them before applying the normal-producer snapshot limit.
  • Keep geo-replication watermark state during producer inactivity cleanup, since it represents source position rather than producer lifecycle state.
  • Stabilize the target-unload regression test by waiting for the current source replicator connection before forcing reconnect.
  • Add unit coverage for geo-replication watermark cleanup, shadow producer-name recovery, marker replay handling, complete snapshot persistence, and replay recovery that only advances the watermark.

Performance impact

The geo V2 publish path now allocates one small immutable source-position object for replicated messages and uses ConcurrentHashMap.compute scoped to the current replicator producer. This avoids a global lock and only serializes updates for the same replication producer, which is the state that must be ordered for correctness.

Snapshot cost remains proportional to the number of active dedup states. Replication watermarks are expected to be bounded by the number of geo/shadow replication producers, not by application producer count, so storing all complete replication watermark pairs should have negligible overhead compared with normal producer dedup state.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

./gradlew :pulsar-broker:test --no-configuration-cache \
  --tests org.apache.pulsar.broker.service.OneWayReplicatorDeduplicationTest.testGeoReplDedupAfterTargetUnloadBeforeSnapshot \
  --tests org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testInactiveGeoReplicationProducerKeepsDedupState \
  --tests org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testSnapshotStoresCompleteReplicationWatermarkPairs \
  --tests org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testReplayRecoverShadowReplicationWatermarkUsesShadowProducerName \
  --tests org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testReplayRecoverGeoReplicationWatermarkOnlyAdvances \
  --tests org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testReplayRecoverGeoReplicationWatermarkSkipsMarkers

Additional local checks:

./gradlew :pulsar-broker:checkstyleMain --no-configuration-cache
./gradlew :pulsar-broker:checkstyleTest --no-configuration-cache
git diff --check

Result: BUILD SUCCESSFUL.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@lhotari lhotari requested a review from poorbarcode May 25, 2026 07:20
@void-ptr974

Copy link
Copy Markdown
Contributor Author

all CI pass in my local repo: https://github.com/void-ptr974/pulsar/actions/runs/26407472425

@void-ptr974

Copy link
Copy Markdown
Contributor Author

Hi @poorbarcode, this PR touches geo-replication V2 dedup watermark recovery and cleanup behavior. Since it is close to the geo-replicator lifecycle work you have been looking at, could you help take a look when you have time? Thanks!

@poorbarcode poorbarcode left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The geo V2 watermark is not recovered from replayed target entries
When deduplication is enabled or a target topic is loaded, the broker restores dedup state from the pulsar.dedup cursor snapshot and then replays entries after that snapshot.
The existing replay path restores normal producer sequence ids from message metadata, but it does not rebuild geo V2 watermark state from replicated messages. If the target topic is unloaded after replicated messages are persisted but before the latest dedup snapshot includes their source positions, the in-memory geo watermark is lost.
If the source replicator later reconnects and replays from its replication cursor, the target broker can fail to identify those source entries as duplicates and persist them again.

This is a bug, and was fixed by the current PR

The geo V2 watermark must be handled atomically
A geo V2 watermark is a pair: source ledger id and source entry id. Treating the two values as independent mutable states can expose a mixed pair to duplicate checks or snapshots, for example a new ledger id combined with an old entry id.
Such a mixed pair does not represent a real source position. If it is snapshotted and later recovered, the target can either move the watermark too far forward and drop valid replayed source entries, or move it backward and allow duplicates.

The situation will never happen, and the current PR did not fixed such issue

Replay recovery must never move the watermark backward
Source-position replay is expected during reconnects and failovers. When rebuilding the watermark from target entries, older source positions can appear after newer positions in recovery windows. The recovered watermark must therefore keep the maximum source position seen so far instead of blindly overwriting the current value.

The improvement is meaningless; once such an issue occurs, we need to fix the place that makes it "older source positions can appear after newer positions ", rather than ordering when executing replayCursor

The geo V2 watermark can be removed by normal producer inactivity cleanup
Normal producer dedup state can be purged after producer inactivity. Geo V2 watermark state has a different lifecycle: it is a source-position checkpoint for replication, not the lifecycle state of the replicator producer.
The source replicator may disconnect, reconnect, or fail over and replay the same source entries from its replication cursor. If the target has purged the geo watermark because the replicator producer was inactive, those replayed source entries can be accepted and written again.

It is a real issue, but the current PR fixed the issue:

  • let repl deduplication status will not be limited by the configuration brokerDeduplicationMaxNumberOfProducers
  • Do not clear the repl deduplication status when purging inactive producers

publishContext.setProperty(MSG_PROP_REPL_SOURCE_POSITION, positionPair);
break;
} catch (NumberFormatException e) {
// Log below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems logging below equals logging here, why not log here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The NumberFormatException branch now logs immediately and returns, avoiding the duplicate fall-through log.

ReplSourcePosition position = new ReplSourcePosition(ledgerId, entryId);
highestReplPositionPushed.put(producerName, position);
highestReplPositionPersisted.put(producerName, position);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an unexpected situation that entryId does not exist. We need to print an error log

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added error logging for incomplete geo-replication watermark snapshot pairs when either _LID or _EID is missing.

final var producerName = metadata.getProducerName();
// Rebuild replication watermarks from entries written after the last dedup snapshot.
recoverReplWatermarkFromMetadata(metadata);
final var sequenceId = Math.max(metadata.getHighestSequenceId(), metadata.getSequenceId());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already use the new collections highestReplPositionPushed and highestReplPositionPersisted to trace the deduplication key, we should not put them into highestSequencedPushed and highestSequencedPersisted anymore.

  • is marker: it should be traced by highestSequencedPushed and highestSequencedPersisted
  • otherwise: it should be traced by highestReplPositionPushed and highestReplPositionPersisted

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Replay now restores marker/normal messages into highestSequenced* and replicated geo/shadow messages into highestReplPosition*.

snapshot.put(producerName + REPL_LEDGER_ID_SUFFIX, replSourcePosition.ledgerId());
snapshot.put(producerName + REPL_ENTRY_ID_SUFFIX, replSourcePosition.entryId());
});
int[] normalProducerCount = new int[1];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using int[1], using int is better

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed this to a plain int with a normal loop.

}

private void recordReplWatermarkPersisted(String producerName, ReplSourcePosition replSourcePosition) {
highestReplPositionPersisted.merge(producerName, replSourcePosition, ReplSourcePosition::max);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sorting is required here, it indicates that there are issues with other codes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the watermark ordering/max merge path and now records the recovered watermark directly.

}

@VisibleForTesting
record ReplSourcePosition(long ledgerId, long entryId) implements Comparable<ReplSourcePosition> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this object enhances the readability and maintainability of the code, but it also increases the GC pressure. I feel that this class is unnecessary

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted. The hot path now uses long[] for parsed source positions; ReplSourcePosition remains only as the map value for the persisted watermark pair.

return MessageDupStatus.Unknown;
}
private void recoverReplWatermark(String producerName, ReplSourcePosition replSourcePosition) {
highestReplPositionPushed.merge(producerName, replSourcePosition, ReplSourcePosition::max);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sorting is required here, it indicates that there are issues with other codes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the max/ordering merge logic and now stores the recovered watermark directly.


String producerName = publishContext.getProducerName();
ReplDedupCheck check = new ReplDedupCheck();
highestReplPositionPushed.compute(producerName, (__, lastPushed) -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost all the changes were made around the producer's name:

Before

  • Cached {producerName} + suffix
  • Stored {producerName} + suffix

After

  • Cached {producerName}
  • Stored {producerName} + suffix

I consider the new solution to be better, which improves the performance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The in-memory watermark cache now uses the base producer name, while snapshot keys keep the _LID / _EID suffixes.

// Reconnect the source replicator. It replays from the old replication cursor and target must deduplicate.
stuckSendReceipt.set(false);
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
assertNotNull(replicator.producer);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you called topics unload, the object replicator will be rebuilt; the verification here is meaningless

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The test now re-fetches the current source topic and replicator after reload before checking the connection.

replicatorClientCnx.get().ctx().channel().close();

Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
assertEquals(replicator.getCursor().getNumberOfEntriesInBacklog(true), 0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you called topics unload, the object replicator will be rebuilt; the verification here is meaningless

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The backlog check now re-fetches the current source topic and replicator before asserting.

@void-ptr974

Copy link
Copy Markdown
Contributor Author

Pushed e823f0a01e8 with the review updates:

  • Split geo/shadow replication watermark recovery from normal producer sequence recovery during replay.
  • Added error logging for incomplete geo-replication watermark snapshot pairs.
  • Removed the extra ReplDedupCheck helper and simplified isDuplicateReplV2 local state handling.
  • Removed the watermark ordering/max merge logic.
  • Changed snapshot normal-producer counting to use a plain int loop.
  • Updated inactive geo-replication cleanup logging to match the actual behavior.
  • Updated the unload-before-snapshot test to re-fetch the current source topic/replicator and wait for target deduplication to become Enabled.
  • Avoided adding new reflection in the new unit tests by using testing accessors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Geo-replication V2 dedup watermark can be lost after target topic reload

4 participants