DBZ-2915 Use Testcontainers to setup Pulsar
This commit is contained in:
parent
68882fec1b
commit
7a4d8d990e
@ -129,64 +129,6 @@
|
||||
</systemProperties>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>io.fabric8</groupId>
|
||||
<artifactId>docker-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<watchInterval>500</watchInterval>
|
||||
<logDate>default</logDate>
|
||||
<verbose>true</verbose>
|
||||
<images>
|
||||
<image>
|
||||
<!-- A Docker image using the all in one Pulsar server -->
|
||||
<name>apachepulsar/pulsar:${version.pulsar}</name>
|
||||
<run>
|
||||
<namingStrategy>none</namingStrategy>
|
||||
<cmd>bin/pulsar standalone</cmd>
|
||||
<ports>
|
||||
<port>${pulsar.port.native}:6650</port>
|
||||
<port>${pulsar.port.http}:8080</port>
|
||||
</ports>
|
||||
<log>
|
||||
<prefix>pulsar</prefix>
|
||||
<enabled>true</enabled>
|
||||
<color>yellow</color>
|
||||
</log>
|
||||
<wait>
|
||||
<time>90000</time> <!-- 90 seconds max -->
|
||||
<log>(?s)org.apache.pulsar.broker.PulsarService - messaging service is ready</log>
|
||||
</wait>
|
||||
<volumes>
|
||||
<bind>
|
||||
<volume>docker/conf:/pulsar/conf</volume>
|
||||
</bind>
|
||||
</volumes>
|
||||
</run>
|
||||
</image>
|
||||
</images>
|
||||
</configuration>
|
||||
<!--
|
||||
Connect this plugin to the maven lifecycle around the integration-test phase:
|
||||
start the container in pre-integration-test and stop it in post-integration-test.
|
||||
-->
|
||||
<executions>
|
||||
<execution>
|
||||
<id>start</id>
|
||||
<phase>pre-integration-test</phase>
|
||||
<goals>
|
||||
<goal>build</goal>
|
||||
<goal>start</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>stop</id>
|
||||
<phase>post-integration-test</phase>
|
||||
<goals>
|
||||
<goal>stop</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<!-- Apply the properties set in the POM to the resource files -->
|
||||
|
@ -12,7 +12,6 @@
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.enterprise.event.Observes;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
@ -21,7 +20,6 @@
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.debezium.server.DebeziumServer;
|
||||
import io.debezium.server.TestConfigSource;
|
||||
import io.debezium.server.events.ConnectorCompletedEvent;
|
||||
import io.debezium.server.events.ConnectorStartedEvent;
|
||||
@ -37,14 +35,13 @@
|
||||
*/
|
||||
@QuarkusTest
|
||||
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
|
||||
@QuarkusTestResource(PulsarTestResourceLifecycleManager.class)
|
||||
public class PulsarIT {
|
||||
|
||||
private static final int MESSAGE_COUNT = 4;
|
||||
private static final String TOPIC_NAME = "testc.inventory.customers";
|
||||
|
||||
protected static PulsarClient pulsarClient;
|
||||
@Inject
|
||||
DebeziumServer server;
|
||||
|
||||
{
|
||||
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
|
||||
@ -55,7 +52,7 @@ void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException
|
||||
Testing.Print.enable();
|
||||
|
||||
pulsarClient = PulsarClient.builder()
|
||||
.serviceUrl(PulsarTestConfigSource.getServiceUrl())
|
||||
.serviceUrl(PulsarTestResourceLifecycleManager.getPulsarServiceUrl())
|
||||
.build();
|
||||
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ public PulsarTestConfigSource() {
|
||||
Map<String, String> pulsarTest = new HashMap<>();
|
||||
|
||||
pulsarTest.put("debezium.sink.type", "pulsar");
|
||||
pulsarTest.put("debezium.sink.pulsar.client.serviceUrl", getServiceUrl());
|
||||
pulsarTest.put("debezium.sink.pulsar.client.serviceUrl", "pulsar://localhost:" + PulsarTestResourceLifecycleManager.PULSAR_PORT);
|
||||
pulsarTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
|
||||
pulsarTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
|
||||
OFFSET_STORE_PATH.toAbsolutePath().toString());
|
||||
@ -29,8 +29,4 @@ public PulsarTestConfigSource() {
|
||||
|
||||
config = pulsarTest;
|
||||
}
|
||||
|
||||
public static String getServiceUrl() {
|
||||
return "pulsar://localhost:" + System.getProperty("pulsar.port.native", "6650");
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.server.pulsar;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.testcontainers.containers.BindMode;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
|
||||
import org.testcontainers.utility.MountableFile;
|
||||
|
||||
public class PulsarTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
|
||||
|
||||
private static final String PULSAR_VERSION = "2.5.2";
|
||||
public static final int PULSAR_PORT = 6650;
|
||||
public static final int PULSAR_HTTP_PORT = 8080;
|
||||
public static final String PULSAR_IMAGE = "apachepulsar/pulsar:" + PULSAR_VERSION;
|
||||
|
||||
private static final GenericContainer<?> container = new GenericContainer<>(PULSAR_IMAGE)
|
||||
.withStartupTimeout(Duration.ofSeconds(90))
|
||||
.waitingFor(Wait.forLogMessage(".*messaging service is ready.*", 1))
|
||||
.withCommand("/pulsar/bin/pulsar", "standalone")
|
||||
.withCopyFileToContainer(MountableFile.forHostPath("/docker/conf/"), "/pulsar/conf")
|
||||
// .withClasspathResourceMapping("/docker/conf/", "/pulsar/conf", BindMode.READ_ONLY)
|
||||
.withExposedPorts(PULSAR_PORT, PULSAR_HTTP_PORT);
|
||||
|
||||
@Override
|
||||
public Map<String, String> start() {
|
||||
container.start();
|
||||
|
||||
Map<String, String> params = new ConcurrentHashMap<>();
|
||||
params.put("debezium.sink.pulsar.client.serviceUrl", getPulsarServiceUrl());
|
||||
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
try {
|
||||
if (container != null) {
|
||||
container.stop();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
|
||||
public static String getPulsarServiceUrl() {
|
||||
return "pulsar://localhost:" + container.getExposedPorts().get(0);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user