Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3302,6 +3302,16 @@ private Response generateResponseWithEntry(Entry entry, PersistentTopic persiste
if (metadata.hasTxnidMostBits()) {
responseBuilder.header("X-Pulsar-txnid-most-bits", metadata.getTxnidMostBits());
}
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, entry.getPosition());
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
boolean isTxnUncommitted = persistentTopic.isTxnOngoing(txnID);
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);
}
boolean isTxnConsumable = entry.getPosition()
.compareTo(persistentTopic.getMaxReadPosition()) <= 0;
responseBuilder.header("X-Pulsar-txn-consumable", isTxnConsumable);
if (metadata.hasHighestSequenceId()) {
responseBuilder.header("X-Pulsar-highest-sequence-id", metadata.getHighestSequenceId());
}
Expand All @@ -3320,14 +3330,6 @@ private Response generateResponseWithEntry(Entry entry, PersistentTopic persiste
if (metadata.hasNullPartitionKey()) {
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
}
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, entry.getPosition());
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
}
boolean isTxnUncommitted = (entry.getPosition())
.compareTo(persistentTopic.getMaxReadPosition()) > 0;
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);

// Decode if needed
CompressionCodec codec = CompressionCodecProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5037,6 +5037,10 @@ public boolean isTxnAborted(TxnID txnID, Position readPosition) {
return this.transactionBuffer.isTxnAborted(txnID, readPosition);
}

public boolean isTxnOngoing(TxnID txnID) {
return this.transactionBuffer.isTxnOngoing(txnID);
}

public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return this.transactionBuffer.getTransactionInBufferStats(txnID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ public interface TransactionBuffer {
*/
boolean isTxnAborted(TxnID txnID, Position readPosition);

/**
* Check if the transaction is still ongoing (not committed and not aborted).
*
* @param txnID {@link TxnID} the transaction id.
* @return whether the txn is ongoing (uncommitted).
*/
boolean isTxnOngoing(TxnID txnID);

/**
* Sync max read position for normal publish.
* @param position {@link Position} the position to sync.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,16 @@ public boolean isTxnAborted(TxnID txnID, Position readPosition) {
return false;
}

@Override
public boolean isTxnOngoing(TxnID txnID) {
TxnBuffer txnBuffer = buffers.get(txnID);
if (txnBuffer == null) {
return false;
}
TxnStatus status = txnBuffer.status();
return status == TxnStatus.OPEN || status == TxnStatus.COMMITTING || status == TxnStatus.ABORTING;
}

@Override
public void syncMaxReadPositionForNormalPublish(Position position, boolean isMarkerMessage) {
if (!isMarkerMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,15 @@ public boolean isTxnAborted(TxnID txnID, Position readPosition) {
}
}

@Override
public boolean isTxnOngoing(TxnID txnID) {
String key = TxnIds.toKey(txnID);
synchronized (lock) {
TxnEntry entry = txns.get(key);
return entry != null && entry.state == TxnState.OPEN;
}
}

@Override
public void syncMaxReadPositionForNormalPublish(Position position, boolean isMarkerMessage) {
if (isMarkerMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,11 @@ public synchronized boolean isTxnAborted(TxnID txnID, Position readPosition) {
return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
}

@Override
public synchronized boolean isTxnOngoing(TxnID txnID) {
return ongoingTxns.containsKey(txnID);
}

/**
* Sync max read position for normal publish.
* @param position {@link Position} the position to sync.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public boolean isTxnAborted(TxnID txnID, Position readPosition) {
return false;
}

@Override
public boolean isTxnOngoing(TxnID txnID) {
return false;
}

@Override
public void syncMaxReadPositionForNormalPublish(Position position, boolean isMarkerMessage) {
if (!isMarkerMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,174 @@ public void testAbortTransaction() throws Exception {
}
}

@Test
public void testPeekMessageTxnHeaderAccuracy() throws Exception {
initTransaction(1);

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_txn_header_accuracy");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

// Start transaction T1, send a message, keep it uncommitted
Transaction txn1 = pulsarClient.newTransaction().build().get();
TxnID txnID1 = ((TransactionImpl) txn1).getTxnID();
producer.newMessage(txn1).value("msg-uncommitted").send();

// Start transaction T2, send a message, commit it
Transaction txn2 = pulsarClient.newTransaction().build().get();
TxnID txnID2 = ((TransactionImpl) txn2).getTxnID();
producer.newMessage(txn2).value("msg-committed").send();
txn2.commit().get();

// Send a normal (non-transactional) message
producer.newMessage().value("msg-normal").send();

// Peek all messages with READ_UNCOMMITTED to get both messages regardless of txn state
List<Message<byte[]>> peekMsgs = admin.topics().peekMessages(topic, "t-sub", 10,
false, TransactionIsolationLevel.READ_UNCOMMITTED);

boolean foundUncommitted = false;
boolean foundCommitted = false;
boolean foundNormal = false;
for (Message<byte[]> peekMsg : peekMsgs) {
String value = new String(peekMsg.getValue());
MessageMetadata metadata = ((MessageImpl<?>) peekMsg).getMessageBuilder();
if ("msg-uncommitted".equals(value)) {
foundUncommitted = true;
// T1 is ongoing, so X-Pulsar-txn-uncommitted should be true
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-uncommitted"));
assertEquals(peekMsg.getProperty("X-Pulsar-txn-uncommitted"), "true");
// T1 is not aborted
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-aborted"),
"T1 is not aborted, X-Pulsar-txn-aborted should be false");
// Blocked by itself (uncommitted), so X-Pulsar-txn-consumable should be false
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-consumable"));
assertEquals(peekMsg.getProperty("X-Pulsar-txn-consumable"), "false");
// TxnID checks
assertTrue(metadata.hasTxnidMostBits());
assertEquals(metadata.getTxnidMostBits(), txnID1.getMostSigBits());
assertTrue(metadata.hasTxnidLeastBits());
assertEquals(metadata.getTxnidLeastBits(), txnID1.getLeastSigBits());
} else if ("msg-committed".equals(value)) {
foundCommitted = true;
// T2 is committed, so X-Pulsar-txn-uncommitted should be false (not present in properties)
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-uncommitted"),
"T2 is committed, X-Pulsar-txn-uncommitted should be false");
// T2 is not aborted
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-aborted"),
"T2 is not aborted, X-Pulsar-txn-aborted should be false");
// Blocked by T1 before it, so X-Pulsar-txn-consumable should be false
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-consumable"));
assertEquals(peekMsg.getProperty("X-Pulsar-txn-consumable"), "false",
"T2 is blocked by uncommitted T1 before it");
// TxnID checks
assertTrue(metadata.hasTxnidMostBits());
assertEquals(metadata.getTxnidMostBits(), txnID2.getMostSigBits());
assertTrue(metadata.hasTxnidLeastBits());
assertEquals(metadata.getTxnidLeastBits(), txnID2.getLeastSigBits());
} else if ("msg-normal".equals(value)) {
foundNormal = true;
// Normal message has no txnid, so X-Pulsar-txn-uncommitted should not be set
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-uncommitted"),
"Normal message should not have X-Pulsar-txn-uncommitted");
// Normal message has no txnid, so X-Pulsar-txn-aborted should not be set
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-aborted"),
"Normal message should not have X-Pulsar-txn-aborted");
// Blocked by T1, so X-Pulsar-txn-consumable should be false
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-consumable"));
assertEquals(peekMsg.getProperty("X-Pulsar-txn-consumable"), "false",
"Normal message is blocked by uncommitted T1");
// Normal message has no txnid
assertFalse(metadata.hasTxnidMostBits());
assertFalse(metadata.hasTxnidLeastBits());
}
}
assertTrue(foundUncommitted, "Should have found the uncommitted message");
assertTrue(foundCommitted, "Should have found the committed message");
assertTrue(foundNormal, "Should have found the normal message");

// Verify READ_COMMITTED filtering: all messages after an uncommitted txn should be filtered
List<Message<byte[]>> committedPeek = admin.topics().peekMessages(topic, "t-sub-2", 10,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(committedPeek.size(), 0,
"READ_COMMITTED should filter all messages after an uncommitted transaction");

// Abort T1 and verify headers update correctly
txn1.abort().get();

List<Message<byte[]>> peekAfterAbort = admin.topics().peekMessages(topic, "t-sub-3", 10,
false, TransactionIsolationLevel.READ_UNCOMMITTED);

boolean foundAborted = false;
boolean foundCommittedAfterAbort = false;
boolean foundNormalAfterAbort = false;
for (Message<byte[]> peekMsg : peekAfterAbort) {
String value = new String(peekMsg.getValue());
MessageMetadata metadata = ((MessageImpl<?>) peekMsg).getMessageBuilder();
if ("msg-uncommitted".equals(value)) {
foundAborted = true;
// T1 is now aborted, so X-Pulsar-txn-aborted should be true
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-aborted"),
"T1 is aborted, X-Pulsar-txn-aborted should be true");
assertEquals(peekMsg.getProperty("X-Pulsar-txn-aborted"), "true");
// T1 is no longer ongoing
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-uncommitted"),
"T1 is aborted, X-Pulsar-txn-uncommitted should be false");
// maxReadPosition advanced past T1, so consumable should be true
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-consumable"));
assertEquals(peekMsg.getProperty("X-Pulsar-txn-consumable"), "true",
"T1 is resolved, message should be consumable");
// TxnID unchanged
assertTrue(metadata.hasTxnidMostBits());
assertEquals(metadata.getTxnidMostBits(), txnID1.getMostSigBits());
assertTrue(metadata.hasTxnidLeastBits());
assertEquals(metadata.getTxnidLeastBits(), txnID1.getLeastSigBits());
} else if ("msg-committed".equals(value)) {
foundCommittedAfterAbort = true;
// T2 is still committed, not aborted
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-uncommitted"),
"T2 is committed, X-Pulsar-txn-uncommitted should be false");
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-aborted"),
"T2 is not aborted, X-Pulsar-txn-aborted should be false");
// maxReadPosition advanced past T1, so consumable should be true
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-consumable"));
assertEquals(peekMsg.getProperty("X-Pulsar-txn-consumable"), "true",
"No open txn before T2, message should be consumable");
// TxnID unchanged
assertTrue(metadata.hasTxnidMostBits());
assertEquals(metadata.getTxnidMostBits(), txnID2.getMostSigBits());
assertTrue(metadata.hasTxnidLeastBits());
assertEquals(metadata.getTxnidLeastBits(), txnID2.getLeastSigBits());
} else if ("msg-normal".equals(value)) {
foundNormalAfterAbort = true;
// Normal message has no txnid, so no txn headers
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-uncommitted"),
"Normal message should not have X-Pulsar-txn-uncommitted");
assertFalse(peekMsg.hasProperty("X-Pulsar-txn-aborted"),
"Normal message should not have X-Pulsar-txn-aborted");
// No open txn before it, so consumable should be true
assertTrue(peekMsg.hasProperty("X-Pulsar-txn-consumable"));
assertEquals(peekMsg.getProperty("X-Pulsar-txn-consumable"), "true",
"No open txn, normal message should be consumable");
// No txnid
assertFalse(metadata.hasTxnidMostBits());
assertFalse(metadata.hasTxnidLeastBits());
}
}
assertTrue(foundAborted, "Should have found the aborted message after abort");
assertTrue(foundCommittedAfterAbort, "Should have found the committed message after abort");
assertTrue(foundNormalAfterAbort, "Should have found the normal message after abort");

// READ_COMMITTED should show committed + normal message (aborted is filtered out)
List<Message<byte[]>> committedPeekAfterAbort = admin.topics().peekMessages(topic, "t-sub-4", 10,
false, TransactionIsolationLevel.READ_COMMITTED);
assertEquals(committedPeekAfterAbort.size(), 2,
"READ_COMMITTED should show committed and normal messages after abort");
assertEquals(new String(committedPeekAfterAbort.get(0).getValue()), "msg-committed");
assertEquals(new String(committedPeekAfterAbort.get(1).getValue()), "msg-normal");
}

@Test
public void testPeekMessageForSkipTxnMarker() throws Exception {
initTransaction(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class TopicsImpl extends BaseResource implements Topics {
private static final String ENCRYPTION_KEYS = "X-Pulsar-Base64-encryption-keys";
public static final String TXN_ABORTED = "X-Pulsar-txn-aborted";
public static final String TXN_UNCOMMITTED = "X-Pulsar-txn-uncommitted";
public static final String TXN_CONSUMABLE = "X-Pulsar-txn-consumable";
// CHECKSTYLE.ON: MemberName

public static final String PROPERTY_SHADOW_SOURCE_KEY = "PULSAR.SHADOW_SOURCE";
Expand Down Expand Up @@ -1342,6 +1343,15 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(
}
}

tmp = headers.getFirst(TXN_CONSUMABLE);
if (tmp != null) {
properties.put(TXN_CONSUMABLE, tmp.toString());
if (!Boolean.parseBoolean(tmp.toString())
&& transactionIsolationLevel == TransactionIsolationLevel.READ_COMMITTED) {
return new ArrayList<>();
}
}

tmp = headers.getFirst(PUBLISH_TIME);
if (tmp != null) {
messageMetadata.setPublishTime(DateFormatter.parse(tmp.toString()));
Expand Down
Loading