From 868fb98e5b78878c987287888fa236e9dcca25ef Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Thu, 25 Jun 2026 22:41:35 +0800 Subject: [PATCH] [fix][test] Stabilize non-persistent geo-replication test --- .../messaging/GeoReplicationTest.java | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java index d28466531e08a..121227b931ca5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java @@ -30,6 +30,8 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; import org.apache.pulsar.tests.integration.topologies.PulsarGeoClusterTestBase; import org.awaitility.Awaitility; @@ -44,6 +46,9 @@ @CustomLog public class GeoReplicationTest extends PulsarGeoClusterTestBase { + private static final int PARTITION_COUNT = 10; + private static final int MESSAGES = 10; + @BeforeClass(alwaysRun = true) public final void setupBeforeClass() throws Exception { setup(); @@ -68,7 +73,7 @@ public final void tearDownAfterClass() throws Exception { cleanup(); } - @Test(timeOut = 1000 * 30, dataProvider = "TopicDomain") + @Test(timeOut = 1000 * 60, dataProvider = "TopicDomain") public void testTopicReplication(String domain) throws Exception { String cluster1 = getGeoCluster().getClusters()[0].getClusterName(); String cluster2 = getGeoCluster().getClusters()[1].getClusterName(); @@ -82,12 +87,12 @@ public void testTopicReplication(String domain) throws Exception { String topic = domain + "://public/default/testTopicReplication-" + UUID.randomUUID(); Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { try { - admin.topics().createPartitionedTopic(topic, 10); + admin.topics().createPartitionedTopic(topic, PARTITION_COUNT); } catch (Exception e) { log.error().attr("topic", topic).exception(e).log("Failed to create partitioned topic ."); Assert.fail("Failed to create partitioned topic " + topic); } - Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 10); + Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, PARTITION_COUNT); }); log.info().attr("topic", topic).log("Test geo-replication produce and consume for topic ."); @@ -120,7 +125,11 @@ public void testTopicReplication(String domain) throws Exception { .attr("topic", topic) .log("Successfully create consumer in cluster for topic ."); - for (int i = 0; i < 10; i++) { + if ("non-persistent".equals(domain)) { + waitForNonPersistentReplicators(admin, topic, cluster2); + } + + for (int i = 0; i < MESSAGES; i++) { p.send(String.format("Message [%d]", i).getBytes(StandardCharsets.UTF_8)); } log.info() @@ -128,7 +137,7 @@ public void testTopicReplication(String domain) throws Exception { .attr("topic", topic) .log("Successfully produce message to cluster for topic ."); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < MESSAGES; i++) { Message message = c.receive(10, TimeUnit.SECONDS); Assert.assertNotNull(message); } @@ -137,4 +146,20 @@ public void testTopicReplication(String domain) throws Exception { .attr("topic", topic) .log("Successfully consume message from cluster for topic ."); } + + private void waitForNonPersistentReplicators(PulsarAdmin admin, String topic, String remoteCluster) + throws Exception { + TopicName topicName = TopicName.get(topic); + Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + for (int i = 0; i < PARTITION_COUNT; i++) { + String partitionName = topicName.getPartition(i).toString(); + ReplicatorStats replicatorStats = admin.topics() + .getStats(partitionName) + .getReplication() + .get(remoteCluster); + Assert.assertNotNull(replicatorStats); + Assert.assertTrue(replicatorStats.isConnected()); + } + }); + } }