DBZ-5962 Sink adapter for Apache RocketMQ
This commit is contained in:
parent
0f01851b05
commit
15a19fa8db
@ -19,7 +19,7 @@
|
||||
<version.pravega>0.9.1</version.pravega>
|
||||
<version.nats>2.16.3</version.nats>
|
||||
<version.stan>2.2.3</version.stan>
|
||||
|
||||
<version.rocketmq>4.9.4</version.rocketmq>
|
||||
<version.commons.logging>1.2</version.commons.logging>
|
||||
</properties>
|
||||
|
||||
@ -140,6 +140,11 @@
|
||||
<artifactId>debezium-server-infinispan</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server-rocketmq</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server-core</artifactId>
|
||||
@ -203,6 +208,22 @@
|
||||
<artifactId>pravega-client</artifactId>
|
||||
<version>${version.pravega}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
<version>${version.rocketmq}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-tools</artifactId>
|
||||
<version>${version.rocketmq}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.infinispan</groupId>
|
||||
<artifactId>infinispan-bom</artifactId>
|
||||
@ -210,6 +231,7 @@
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
||||
|
@ -125,6 +125,10 @@
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server-infinispan</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server-rocketmq</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-logging-json</artifactId>
|
||||
|
177
debezium-server/debezium-server-rocketmq/pom.xml
Normal file
177
debezium-server/debezium-server-rocketmq/pom.xml
Normal file
@ -0,0 +1,177 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server</artifactId>
|
||||
<version>2.2.0-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>debezium-server-rocketmq</artifactId>
|
||||
<name>Debezium Server RocketMQ Sink Adapter</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-tools</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Testing -->
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-testing-testcontainers</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-core</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server-core</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-connect-avro-converter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-connector-postgres</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-maven-plugin</artifactId>
|
||||
<version>${quarkus.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>build</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.jboss.jandex</groupId>
|
||||
<artifactId>jandex-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-index</id>
|
||||
<goals>
|
||||
<goal>jandex</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>integration-test</id>
|
||||
<goals>
|
||||
<goal>integration-test</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>verify</id>
|
||||
<goals>
|
||||
<goal>verify</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<reuseForks>false</reuseForks>
|
||||
<skipTests>${skipITs}</skipTests>
|
||||
<enableAssertions>true</enableAssertions>
|
||||
<systemProperties>
|
||||
<test.type>IT</test.type>
|
||||
</systemProperties>
|
||||
<runOrder>${runOrder}</runOrder>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<!-- Apply the properties set in the POM to the resource files -->
|
||||
<resource>
|
||||
<filtering>true</filtering>
|
||||
<directory>src/main/resources</directory>
|
||||
<includes>
|
||||
<include>**/build.properties</include>
|
||||
</includes>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>quick</id>
|
||||
<activation>
|
||||
<activeByDefault>false</activeByDefault>
|
||||
<property>
|
||||
<name>quick</name>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<skipITs>true</skipITs>
|
||||
<docker.skip>true</docker.skip>
|
||||
</properties>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>skip-integration-tests</id>
|
||||
<activation>
|
||||
<activeByDefault>false</activeByDefault>
|
||||
<property>
|
||||
<name>skipITs</name>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<docker.skip>true</docker.skip>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
</project>
|
@ -0,0 +1,163 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.rocketmq;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.enterprise.context.Dependent;
|
||||
import javax.enterprise.inject.Instance;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.remoting.RPCHook;
|
||||
import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
||||
import org.eclipse.microprofile.config.Config;
|
||||
import org.eclipse.microprofile.config.ConfigProvider;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.DebeziumException;
|
||||
import io.debezium.engine.ChangeEvent;
|
||||
import io.debezium.engine.DebeziumEngine;
|
||||
import io.debezium.server.BaseChangeConsumer;
|
||||
import io.debezium.server.CustomConsumerBuilder;
|
||||
|
||||
/**
|
||||
* rocketmq change consumer
|
||||
*/
|
||||
@Named("rocketmq")
|
||||
@Dependent
|
||||
public class RocketMqChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqChangeConsumer.class);
|
||||
|
||||
private static final String PROP_PREFIX = "debezium.sink.rocketmq.";
|
||||
|
||||
private static final String PROP_PRODUCER_PREFIX = PROP_PREFIX + "producer.";
|
||||
|
||||
// acl config
|
||||
private static final String PROP_PRODUCER_ACL_ENABLE = PROP_PRODUCER_PREFIX + "acl.enabled";
|
||||
private static final String PROP_PRODUCER_ACCESS_KEY = PROP_PRODUCER_PREFIX + "access.key";
|
||||
private static final String PROP_PRODUCER_SECRET_KEY = PROP_PRODUCER_PREFIX + "secret.key";
|
||||
// common config
|
||||
private static final String PROP_PRODUCER_NAME_SRV_ADDR = PROP_PRODUCER_PREFIX + "name.srv.addr";
|
||||
private static final String PROP_PRODUCER_GROUP = PROP_PRODUCER_PREFIX + "group";
|
||||
private static final String PROP_PRODUCER_MAX_MESSAGE_SIZE = PROP_PRODUCER_PREFIX + "max.message.size";
|
||||
private static final String PROP_PRODUCER_SEND_MSG_TIMEOUT = PROP_PRODUCER_PREFIX + "send.msg.timeout";
|
||||
@Inject
|
||||
@CustomConsumerBuilder
|
||||
Instance<DefaultMQProducer> customRocketMqProducer;
|
||||
private DefaultMQProducer mqProducer;
|
||||
|
||||
@PostConstruct
|
||||
void connect() {
|
||||
if (customRocketMqProducer.isResolvable()) {
|
||||
mqProducer = customRocketMqProducer.get();
|
||||
startProducer();
|
||||
LOGGER.info("Obtained custom configured RocketMqProducer '{}'", mqProducer);
|
||||
return;
|
||||
}
|
||||
final Config config = ConfigProvider.getConfig();
|
||||
// init rocketmq producer
|
||||
RPCHook rpcHook = null;
|
||||
Optional<Boolean> aclEnable = config.getOptionalValue(PROP_PRODUCER_ACL_ENABLE, Boolean.class);
|
||||
if (aclEnable.isPresent() && aclEnable.get()) {
|
||||
if (config.getOptionalValue(PROP_PRODUCER_ACCESS_KEY, String.class).isEmpty()
|
||||
|| config.getOptionalValue(PROP_PRODUCER_SECRET_KEY, String.class).isEmpty()) {
|
||||
throw new DebeziumException("When acl.enabled is true, access key and secret key cannot be empty");
|
||||
}
|
||||
rpcHook = new AclClientRPCHook(
|
||||
new SessionCredentials(
|
||||
config.getValue(PROP_PRODUCER_ACCESS_KEY, String.class),
|
||||
config.getValue(PROP_PRODUCER_SECRET_KEY, String.class)));
|
||||
}
|
||||
this.mqProducer = new DefaultMQProducer(rpcHook);
|
||||
this.mqProducer.setNamesrvAddr(config.getValue(PROP_PRODUCER_NAME_SRV_ADDR, String.class));
|
||||
this.mqProducer.setInstanceName(createUniqInstance(config.getValue(PROP_PRODUCER_NAME_SRV_ADDR, String.class)));
|
||||
this.mqProducer.setProducerGroup(config.getValue(PROP_PRODUCER_GROUP, String.class));
|
||||
|
||||
if (config.getOptionalValue(PROP_PRODUCER_SEND_MSG_TIMEOUT, Integer.class).isPresent()) {
|
||||
this.mqProducer.setSendMsgTimeout(config.getValue(PROP_PRODUCER_SEND_MSG_TIMEOUT, Integer.class));
|
||||
}
|
||||
|
||||
if (config.getOptionalValue(PROP_PRODUCER_MAX_MESSAGE_SIZE, Integer.class).isPresent()) {
|
||||
this.mqProducer.setMaxMessageSize(config.getValue(PROP_PRODUCER_MAX_MESSAGE_SIZE, Integer.class));
|
||||
}
|
||||
|
||||
this.mqProducer.setLanguage(LanguageCode.JAVA);
|
||||
startProducer();
|
||||
}
|
||||
|
||||
private void startProducer() {
|
||||
try {
|
||||
this.mqProducer.start();
|
||||
LOGGER.info("Consumer started...");
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
throw new DebeziumException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String createUniqInstance(String prefix) {
|
||||
return prefix.concat("-").concat(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
void close() {
|
||||
// Closed rocketmq producer
|
||||
LOGGER.info("Consumer destroy...");
|
||||
if (mqProducer != null) {
|
||||
mqProducer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
|
||||
throws InterruptedException {
|
||||
final CountDownLatch latch = new CountDownLatch(records.size());
|
||||
for (ChangeEvent<Object, Object> record : records) {
|
||||
try {
|
||||
final String topicName = streamNameMapper.map(record.destination());
|
||||
String key = getString(record.key());
|
||||
mqProducer.send(new Message(topicName, null, key, getBytes(record.value())), new SelectMessageQueueByHash(), key, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
LOGGER.debug("Sent message with offset: {}", sendResult.getQueueOffset());
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
LOGGER.error("Failed to send record to {}:", record.destination(), throwable);
|
||||
throw new DebeziumException(throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new DebeziumException(e);
|
||||
}
|
||||
}
|
||||
latch.await();
|
||||
for (ChangeEvent<Object, Object> record : records) {
|
||||
committer.markProcessed(record);
|
||||
}
|
||||
committer.markBatchFinished();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.rocketmq;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import com.github.dockerjava.api.command.InspectContainerResponse;
|
||||
|
||||
/**
|
||||
* rocketmq container
|
||||
*/
|
||||
public class RocketMqContainer extends GenericContainer<RocketMqContainer> {
|
||||
|
||||
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apache/rocketmq:4.9.4");
|
||||
private static final int defaultBrokerPermission = 6;
|
||||
public static final int NAMESRV_PORT = 9876;
|
||||
public static final int BROKER_PORT = 10911;
|
||||
|
||||
public RocketMqContainer() {
|
||||
super(DEFAULT_IMAGE_NAME);
|
||||
withExposedPorts(NAMESRV_PORT, BROKER_PORT, BROKER_PORT - 2);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
String command = "#!/bin/bash\n";
|
||||
command += "./mqnamesrv &\n";
|
||||
command += "./mqbroker -n localhost:" + NAMESRV_PORT;
|
||||
withCommand("sh", "-c", command);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
protected void containerIsStarted(InspectContainerResponse containerInfo) {
|
||||
List<String> updateBrokerConfigCommands = new ArrayList<>();
|
||||
updateBrokerConfigCommands.add(updateBrokerConfig("brokerIP1", getHost()));
|
||||
updateBrokerConfigCommands.add(updateBrokerConfig("listenPort", getMappedPort(BROKER_PORT)));
|
||||
updateBrokerConfigCommands.add(updateBrokerConfig("brokerPermission", defaultBrokerPermission));
|
||||
final String command = String.join(" && ", updateBrokerConfigCommands);
|
||||
ExecResult result = null;
|
||||
try {
|
||||
result = execInContainer(
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
command);
|
||||
}
|
||||
catch (IOException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (result != null && result.getExitCode() != 0) {
|
||||
throw new IllegalStateException(result.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private String updateBrokerConfig(final String key, final Object val) {
|
||||
final String brokerAddr = "localhost:" + BROKER_PORT;
|
||||
return "./mqadmin updateBrokerConfig -b " + brokerAddr + " -k " + key + " -v " + val;
|
||||
}
|
||||
|
||||
public String getNamesrvAddr() {
|
||||
return String.format("%s:%s", getHost(), getMappedPort(NAMESRV_PORT));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.rocketmq;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.enterprise.event.Observes;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.debezium.server.TestConfigSource;
|
||||
import io.debezium.server.events.ConnectorCompletedEvent;
|
||||
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
|
||||
import io.debezium.util.Testing;
|
||||
import io.quarkus.test.common.QuarkusTestResource;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
|
||||
@QuarkusTest
|
||||
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
|
||||
@QuarkusTestResource(RocketMqTestResourceLifecycleManager.class)
|
||||
public class RocketMqIT {
|
||||
|
||||
private static final int MESSAGE_COUNT = 4;
|
||||
|
||||
private static DefaultLitePullConsumer consumer = null;
|
||||
|
||||
{
|
||||
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
|
||||
Testing.Files.createTestingFile(RocketMqTestConfigSource.OFFSET_STORE_PATH);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void stop() {
|
||||
if (consumer != null) {
|
||||
consumer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
|
||||
if (!event.isSuccess()) {
|
||||
throw new RuntimeException(event.getError().get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRocketMQ() throws Exception {
|
||||
// start consumer
|
||||
consumer = new DefaultLitePullConsumer("consumer-group");
|
||||
consumer.setNamesrvAddr(RocketMqTestResourceLifecycleManager.getNameSrvAddr());
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
||||
consumer.subscribe(RocketMqTestConfigSource.TOPIC_NAME, "*");
|
||||
consumer.start();
|
||||
|
||||
// consume record
|
||||
final List<MessageExt> records = new ArrayList<>();
|
||||
Awaitility.await().atMost(Duration.ofSeconds(RocketMqTestConfigSource.waitForSeconds())).until(() -> {
|
||||
records.addAll(this.consumer.poll(5000));
|
||||
return records.size() >= MESSAGE_COUNT;
|
||||
});
|
||||
assertThat(records.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT);
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.rocketmq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
|
||||
|
||||
import io.debezium.server.TestConfigSource;
|
||||
|
||||
public class RocketMqTestConfigSource extends TestConfigSource {
|
||||
|
||||
public static final String TOPIC_NAME = "testc-inventory-customers";
|
||||
|
||||
public RocketMqTestConfigSource() {
|
||||
|
||||
final Map<String, String> rocketmqConfig = new HashMap<>();
|
||||
rocketmqConfig.put("debezium.sink.type", "rocketmq");
|
||||
rocketmqConfig.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
|
||||
rocketmqConfig.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
|
||||
rocketmqConfig.put("debezium.source.offset.flush.interval.ms", "0");
|
||||
rocketmqConfig.put("debezium.source.topic.prefix", "testc");
|
||||
rocketmqConfig.put("debezium.source.schema.include.list", "inventory");
|
||||
rocketmqConfig.put("debezium.source.table.include.list", "inventory.customers");
|
||||
rocketmqConfig.put("debezium.transforms", "Reroute");
|
||||
rocketmqConfig.put("debezium.transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
|
||||
rocketmqConfig.put("debezium.transforms.Reroute.topic.regex", "(.*)");
|
||||
rocketmqConfig.put("debezium.transforms.Reroute.topic.replacement", TOPIC_NAME);
|
||||
config = rocketmqConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrdinal() {
|
||||
// Configuration property precedence is based on ordinal values and since we override the
|
||||
// properties in TestConfigSource, we should give this a higher priority.
|
||||
return super.getOrdinal() + 1;
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.rocketmq;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
|
||||
|
||||
/**
|
||||
* Manages the lifecycle of a RocketMQ cluster test resource.
|
||||
*/
|
||||
public class RocketMqTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
|
||||
|
||||
public static RocketMqContainer container = new RocketMqContainer();
|
||||
private static final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
private static synchronized void init() {
|
||||
if (!running.get()) {
|
||||
container.start();
|
||||
running.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> start() {
|
||||
init();
|
||||
Map<String, String> params = new ConcurrentHashMap<>();
|
||||
params.put("debezium.sink.rocketmq.producer.name.srv.addr", getNameSrvAddr());
|
||||
params.put("debezium.sink.rocketmq.producer.group", "producer-group");
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
try {
|
||||
if (container != null) {
|
||||
container.stop();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
|
||||
public static String getNameSrvAddr() {
|
||||
return container.getNamesrvAddr();
|
||||
}
|
||||
}
|
@ -0,0 +1 @@
|
||||
io.debezium.server.rocketmq.RocketMqTestConfigSource
|
@ -0,0 +1,39 @@
|
||||
<configuration>
|
||||
|
||||
<appender name="CONSOLE"
|
||||
class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="warn">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</root>
|
||||
|
||||
<!-- Set up the default logging to be INFO level, then override specific
|
||||
units -->
|
||||
<logger name="io.debezium" level="info" additivity="false">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger>
|
||||
<logger
|
||||
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig"
|
||||
level="warn" additivity="false">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger>
|
||||
<logger
|
||||
name="io.debezium.converters.CloudEventsConverterConfig"
|
||||
level="warn" additivity="false">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger>
|
||||
<logger
|
||||
name="org.apache.kafka.connect.json.JsonConverterConfig"
|
||||
level="warn" additivity="false">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger>
|
||||
<logger
|
||||
name="io.confluent"
|
||||
level="warn" additivity="false">
|
||||
<appender-ref ref="CONSOLE" />
|
||||
</logger>
|
||||
</configuration>
|
@ -26,6 +26,7 @@
|
||||
<module>debezium-server-nats-streaming</module>
|
||||
<module>debezium-server-nats-jetstream</module>
|
||||
<module>debezium-server-infinispan</module>
|
||||
<module>debezium-server-rocketmq</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -1197,6 +1197,71 @@ When the alternative implementations are not available then the default ones are
|
||||
|Custom instance of https://docs.jboss.org/infinispan/14.0/apidocs/org/infinispan/client/hotrod/RemoteCache.html[Hot Rod cache] which will be used for connecting and sending events to the Infinspan cluster.
|
||||
|===
|
||||
|
||||
==== Apache RocketMQ
|
||||
|
||||
https://rocketmq.apache.org/[Apache RocketMQ] is a distributed messaging and streaming platform with low latency, high
|
||||
performance and reliability, trillion-level capacity and flexible scalability.Debezium server supports publishing
|
||||
captured change events to a configured RocketMQ.
|
||||
|
||||
[cols="35%a,10%a,55%a",options="header"]
|
||||
|===
|
||||
|Property
|
||||
|Default
|
||||
|Description
|
||||
|
||||
|[[rocketmq-type]]<<rocketmq-type, `debezium.sink.type`>>
|
||||
|
|
||||
|Must be set to `rocketmq`.
|
||||
|
||||
|[[rocketmq-namesrv-addr]]<<rocketmq-namesrv-addr, `debezium.sink.rocketmq.producer.name.srv.addr`>>
|
||||
|
|
||||
|Name server address of Apache RocketMQ .
|
||||
|
||||
|[[rocketmq-producer-group]]<<rocketmq-producer-group, `debezium.sink.rocketmq.producer.group`>>
|
||||
|
|
||||
|Producer group of Apache RocketMQ.
|
||||
|
||||
|[[rocketmq-max-message-size]]<<rocketmq-max-message-size, `debezium.sink.rocketmq.producer.max.message.size`>>
|
||||
| 4M, Suggest less than 4 MB.
|
||||
|(Optional)Maximum number of bytes of sent message body.
|
||||
|
||||
|[[rocketmq-send-timeout]]<<rocketmq-send-timeout, `debezium.sink.rocketmq.producer.send.msg.timeout`>>
|
||||
| 3000ms
|
||||
|(Optional) The send message timeout duration is the waiting time for local synchronous invocation of clients. Set a
|
||||
proper value based on the actual application to avoid long thread blocking time.
|
||||
|
||||
|[[rocketmq-acl-enabled]]<<rocketmq-acl-enabled, `debezium.sink.rocketmq.producer.acl.enabled`>>
|
||||
| false
|
||||
|(Optional) The configuration is used to enable access authorization.
|
||||
|
||||
|[[rocketmq-access-key]]<<rocketmq-access-key, `debezium.sink.rocketmq.producer.access.key`>>
|
||||
|
|
||||
|(Optional) The access key used for connecting to Apache RocketMQ cluster .
|
||||
|
||||
|[[rocketmq-secret-key]]<<rocketmq-secret-key, `debezium.sink.rocketmq.producer.secret.key`>>
|
||||
|
|
||||
|(Optional) The access secret used for connecting to Apache RocketMQ cluster .
|
||||
|
||||
|===
|
||||
|
||||
===== Injection points
|
||||
|
||||
The rocketmq sink behaviour can be modified by a custom logic providing alternative implementations for specific
|
||||
functionalities.
|
||||
When the alternative implementations are not available then the default ones are used.
|
||||
|
||||
[cols="35%a,10%a,55%a",options="header"]
|
||||
|===
|
||||
|Interface
|
||||
|CDI classifier
|
||||
|Description
|
||||
|
||||
|[[rocketmq-producer]]<<rocketmq-producer, `org.apache.rocketmq.client.producer.DefaultMQProducer`>>
|
||||
|`@CustomConsumerBuilder`
|
||||
|Custom configured instance of a RocketMQ used to publish messages to target topic.
|
||||
|===
|
||||
|
||||
|
||||
== Extensions
|
||||
|
||||
{prodname} Server uses the https://quarkus.io/[Quarkus framework] and relies on dependency injection to enable developer to extend its behaviour.
|
||||
|
Loading…
Reference in New Issue
Block a user