Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarGeoClusterTestBase;
import org.awaitility.Awaitility;
Expand All @@ -44,6 +46,9 @@
@CustomLog
public class GeoReplicationTest extends PulsarGeoClusterTestBase {

private static final int PARTITION_COUNT = 10;
private static final int MESSAGES = 10;

@BeforeClass(alwaysRun = true)
public final void setupBeforeClass() throws Exception {
setup();
Expand All @@ -68,7 +73,7 @@ public final void tearDownAfterClass() throws Exception {
cleanup();
}

@Test(timeOut = 1000 * 30, dataProvider = "TopicDomain")
@Test(timeOut = 1000 * 60, dataProvider = "TopicDomain")
public void testTopicReplication(String domain) throws Exception {
String cluster1 = getGeoCluster().getClusters()[0].getClusterName();
String cluster2 = getGeoCluster().getClusters()[1].getClusterName();
Expand All @@ -82,12 +87,12 @@ public void testTopicReplication(String domain) throws Exception {
String topic = domain + "://public/default/testTopicReplication-" + UUID.randomUUID();
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
try {
admin.topics().createPartitionedTopic(topic, 10);
admin.topics().createPartitionedTopic(topic, PARTITION_COUNT);
} catch (Exception e) {
log.error().attr("topic", topic).exception(e).log("Failed to create partitioned topic .");
Assert.fail("Failed to create partitioned topic " + topic);
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 10);
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, PARTITION_COUNT);
});
log.info().attr("topic", topic).log("Test geo-replication produce and consume for topic .");

Expand Down Expand Up @@ -120,15 +125,19 @@ public void testTopicReplication(String domain) throws Exception {
.attr("topic", topic)
.log("Successfully create consumer in cluster for topic .");

for (int i = 0; i < 10; i++) {
if ("non-persistent".equals(domain)) {
waitForNonPersistentReplicators(admin, topic, cluster2);
}

for (int i = 0; i < MESSAGES; i++) {
p.send(String.format("Message [%d]", i).getBytes(StandardCharsets.UTF_8));
}
log.info()
.attr("cluster", cluster1)
.attr("topic", topic)
.log("Successfully produce message to cluster for topic .");

for (int i = 0; i < 10; i++) {
for (int i = 0; i < MESSAGES; i++) {
Message<byte[]> message = c.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(message);
}
Expand All @@ -137,4 +146,20 @@ public void testTopicReplication(String domain) throws Exception {
.attr("topic", topic)
.log("Successfully consume message from cluster for topic .");
}

private void waitForNonPersistentReplicators(PulsarAdmin admin, String topic, String remoteCluster)
throws Exception {
TopicName topicName = TopicName.get(topic);
Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
for (int i = 0; i < PARTITION_COUNT; i++) {
String partitionName = topicName.getPartition(i).toString();
ReplicatorStats replicatorStats = admin.topics()
.getStats(partitionName)
.getReplication()
.get(remoteCluster);
Assert.assertNotNull(replicatorStats);
Assert.assertTrue(replicatorStats.isConnected());
}
});
}
}
Loading