diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml
index 80bce464e..e5a16fb33 100644
--- a/debezium-bom/pom.xml
+++ b/debezium-bom/pom.xml
@@ -37,10 +37,11 @@
20.0.0
-
+
+
4.1.1
-
+
2.17.241
@@ -434,6 +435,17 @@
jedis
${version.jedis}
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${version.rocketmq}
+
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ ${version.rocketmq}
+
diff --git a/debezium-server/debezium-server-bom/pom.xml b/debezium-server/debezium-server-bom/pom.xml
index 5b4ecfadc..412a93aa4 100644
--- a/debezium-server/debezium-server-bom/pom.xml
+++ b/debezium-server/debezium-server-bom/pom.xml
@@ -19,7 +19,6 @@
0.9.1
2.16.3
2.2.3
- 4.9.4
1.2
diff --git a/debezium-storage/debezium-storage-rocketmq/pom.xml b/debezium-storage/debezium-storage-rocketmq/pom.xml
new file mode 100644
index 000000000..ce979d6b9
--- /dev/null
+++ b/debezium-storage/debezium-storage-rocketmq/pom.xml
@@ -0,0 +1,105 @@
+
+
+
+ io.debezium
+ debezium-storage
+ 2.2.0-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ debezium-storage-rocketmq
+ Debezium Storage RocketMq Module
+ jar
+
+
+
+ io.debezium
+ debezium-api
+
+
+ io.debezium
+ debezium-core
+
+
+ org.slf4j
+ slf4j-api
+ provided
+
+
+ org.apache.kafka
+ connect-api
+ provided
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.kafka
+ connect-runtime
+
+
+
+
+
+
+
+ assembly
+
+ false
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ ${version.assembly.plugin}
+
+
+ io.debezium
+ debezium-assembly-descriptors
+ ${project.version}
+
+
+
+
+ default
+ package
+
+ single
+
+
+ ${project.artifactId}-${project.version}
+ true
+
+ ${assembly.descriptor}
+
+ posix
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqAdminUtil.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqAdminUtil.java
new file mode 100644
index 000000000..2c5549c98
--- /dev/null
+++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqAdminUtil.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.storage.rocketmq;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+/**
+ * Tools for creating RocketMq topic and group
+ */
+public class RocketMqAdminUtil {
+
+ public static String createUniqInstance(String prefix) {
+ return prefix.concat("-").concat(UUID.randomUUID().toString());
+ }
+
+ public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+ return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ }
+
+ public static DefaultLitePullConsumer initDefaultLitePullConsumer(RocketMqConfig config,
+ boolean autoCommit)
+ throws MQClientException {
+ DefaultLitePullConsumer consumer = null;
+ if (Objects.isNull(consumer)) {
+ if (StringUtils.isBlank(config.getAccessKey()) && StringUtils.isBlank(config.getSecretKey())) {
+ consumer = new DefaultLitePullConsumer(
+ config.getGroupId());
+ }
+ else {
+ consumer = new DefaultLitePullConsumer(
+ config.getGroupId(),
+ getAclRPCHook(config.getAccessKey(), config.getSecretKey()));
+ }
+ }
+ consumer.setNamesrvAddr(config.getNamesrvAddr());
+ String uniqueName = createUniqInstance(config.getNamesrvAddr());
+ consumer.setInstanceName(uniqueName);
+ consumer.setUnitName(uniqueName);
+ consumer.setAutoCommit(autoCommit);
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ return consumer;
+ }
+
+ public static DefaultMQProducer initDefaultMqProducer(RocketMqConfig connectConfig) {
+ RPCHook rpcHook = null;
+ if (connectConfig.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+ producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+ producer.setProducerGroup(connectConfig.getGroupId());
+ producer.setLanguage(LanguageCode.JAVA);
+ return producer;
+ }
+
+ private static DefaultMQAdminExt startMqAdminTool(RocketMqConfig config) throws MQClientException {
+ DefaultMQAdminExt admin;
+ if (config.isAclEnable()) {
+ admin = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(config.getAccessKey(), config.getSecretKey())));
+ }
+ else {
+ admin = new DefaultMQAdminExt();
+ }
+ admin.setNamesrvAddr(config.getNamesrvAddr());
+ admin.setAdminExtGroup(config.getGroupId());
+ admin.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+ admin.start();
+ return admin;
+ }
+
+ public static void createTopic(RocketMqConfig config, TopicConfig topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMqAdminTool(config);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("RocketMq create schema history topic: " + topicConfig.getTopicName() + " " +
+ " failed", e);
+ }
+ finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ public static boolean topicExist(RocketMqConfig config, String topic) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ boolean foundTopicRouteInfo = false;
+ try {
+ defaultMQAdminExt = startMqAdminTool(config);
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData != null) {
+ foundTopicRouteInfo = true;
+ }
+ }
+ catch (Exception e) {
+ if (e instanceof MQClientException) {
+ if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+ foundTopicRouteInfo = false;
+ }
+ else {
+ throw new RuntimeException("Failed to get topic information", e);
+ }
+ }
+ else {
+ throw new RuntimeException("Failed to get topic information", e);
+ }
+ }
+ finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return foundTopicRouteInfo;
+ }
+
+ public static Set fetchAllConsumerGroup(RocketMqConfig connectConfig) {
+ Set consumerGroupSet = new HashSet<>();
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMqAdminTool(connectConfig);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+ consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("RocketMq admin fetch all topic failed", e);
+ }
+ finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return consumerGroupSet;
+ }
+
+ public static String createGroup(RocketMqConfig connectConfig, String group) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMqAdminTool(connectConfig);
+ SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
+ initConfig.setGroupName(group);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("create subGroup: " + group + " failed", e);
+ }
+ finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return group;
+ }
+
+ public static Map offsets(RocketMqConfig config, String topic) {
+ // Get database schema topic min and max offset
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = RocketMqAdminUtil.startMqAdminTool(config);
+ TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
+ return topicStatsTable.getOffsetTable();
+ }
+ catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+
+}
diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqConfig.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqConfig.java
new file mode 100644
index 000000000..d6ce83c2a
--- /dev/null
+++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqConfig.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.storage.rocketmq;
+
+import java.util.Objects;
+
+/**
+ * Configuration for connecting RocketMq
+ */
+public class RocketMqConfig {
+ private String namesrvAddr;
+
+ private String groupId;
+
+ /**
+ * set acl config
+ **/
+ private boolean aclEnable;
+ private String accessKey;
+ private String secretKey;
+
+ private RocketMqConfig(String rmqConsumerGroup, String namesrvAddr, boolean aclEnable, String accessKey,
+ String secretKey) {
+ this.groupId = rmqConsumerGroup;
+ this.namesrvAddr = namesrvAddr;
+ this.aclEnable = aclEnable;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public boolean isAclEnable() {
+ return aclEnable;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RocketMqConfig that = (RocketMqConfig) o;
+ return aclEnable == that.aclEnable && Objects.equals(namesrvAddr, that.namesrvAddr) && Objects.equals(groupId, that.groupId)
+ && Objects.equals(accessKey, that.accessKey) && Objects.equals(secretKey, that.secretKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(namesrvAddr, groupId, aclEnable, accessKey, secretKey);
+ }
+
+ @Override
+ public String toString() {
+ return "RocketMqConfig{" +
+ "namesrvAddr='" + namesrvAddr + '\'' +
+ ", groupId='" + groupId + '\'' +
+ ", aclEnable=" + aclEnable +
+ ", accessKey='" + accessKey + '\'' +
+ ", secretKey='" + secretKey + '\'' +
+ '}';
+ }
+
+ public static class Builder {
+ private String namesrvAddr;
+ private String groupId;
+ /**
+ * set acl config
+ **/
+ private boolean aclEnable;
+ private String accessKey;
+ private String secretKey;
+
+ public Builder namesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ return this;
+ }
+
+ public Builder groupId(String groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
+ public Builder aclEnable(boolean aclEnable) {
+ this.aclEnable = aclEnable;
+ return this;
+ }
+
+ public Builder accessKey(String accessKey) {
+ this.accessKey = accessKey;
+ return this;
+ }
+
+ public Builder secretKey(String secretKey) {
+ this.secretKey = secretKey;
+ return this;
+ }
+
+ public RocketMqConfig build() {
+ return new RocketMqConfig(groupId, namesrvAddr, aclEnable, accessKey, secretKey);
+ }
+ }
+}
diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelector.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelector.java
new file mode 100644
index 000000000..a5e4823e7
--- /dev/null
+++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelector.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.storage.rocketmq;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/**
+ * Always select MessageQueue with queue id 0
+ */
+public class ZeroMessageQueueSelector implements MessageQueueSelector {
+ @Override
+ public MessageQueue select(List mqs, Message msg, Object arg) {
+ return mqs.stream().filter(messageQueue -> (messageQueue.getQueueId() == 0)).collect(Collectors.toSet()).stream().findFirst().get();
+ }
+}
diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java
new file mode 100644
index 000000000..109603a18
--- /dev/null
+++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java
@@ -0,0 +1,323 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.storage.rocketmq.history;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.annotation.NotThreadSafe;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
+import io.debezium.relational.history.AbstractSchemaHistory;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.relational.history.SchemaHistoryException;
+import io.debezium.relational.history.SchemaHistoryListener;
+import io.debezium.storage.rocketmq.RocketMqAdminUtil;
+import io.debezium.storage.rocketmq.RocketMqConfig;
+import io.debezium.storage.rocketmq.ZeroMessageQueueSelector;
+
+@NotThreadSafe
+public class RocketMqSchemaHistory extends AbstractSchemaHistory {
+ public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.topic")
+ .withDisplayName("Database schema history topic name")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("The name of the topic for the database schema history")
+ .withValidation(RocketMqSchemaHistory.forRocketMq(Field::isRequired));
+ /**
+ * rocketmq name server addr
+ */
+ public static final Field NAME_SRV_ADDR = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.name.srv.addr")
+ .withDisplayName("NameServer address")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("RocketMq service discovery service nameserver address configuration")
+ .withValidation(Field::isRequired);
+ public static final Field ROCKETMQ_ACL_ENABLE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.acl.enabled")
+ .withDisplayName("Access control list enabled")
+ .withType(ConfigDef.Type.BOOLEAN)
+ .withDefault(false)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("RocketMq access control enable configuration, default is 'false'");
+ public static final Field ROCKETMQ_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.access.key")
+ .withDisplayName("RocketMq access key")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("RocketMQ access key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty");
+ public static final Field ROCKETMQ_SECRET_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.secret.key")
+ .withDisplayName("RocketMq secret key")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("RocketMQ secret key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty");
+ public static final Field RECOVERY_POLL_ATTEMPTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.recovery.attempts")
+ .withDisplayName("Max attempts to recovery database schema history")
+ .withType(ConfigDef.Type.INT)
+ .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0))
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription("The number of attempts in a row that no data are returned from RocketMQ before recover " +
+ "completes. "
+ + "The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).")
+ .withDefault(60)
+ .withValidation(Field::isInteger);
+ public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ + "rocketmq.recovery.poll.interval.ms")
+ .withDisplayName("Poll interval during database schema history recovery (ms)")
+ .withType(ConfigDef.Type.INT)
+ .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription("The number of milliseconds to wait while polling for persisted data during recovery.")
+ .withDefault(1000)
+ .withValidation(Field::isLong);
+ public static final Field STORE_RECORD_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ + "rocketmq.store.record.timeout.ms")
+ .withDisplayName("Timeout for sending messages to RocketMq")
+ .withType(ConfigDef.Type.INT)
+ .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription("Timeout for sending messages to RocketMq.")
+ .withDefault(60 * 1000)
+ .withValidation(Field::isLong);
+ private final static Logger LOGGER = LoggerFactory.getLogger(RocketMqSchemaHistory.class);
+ private static final Integer MESSAGE_QUEUE = 0;
+ private final DocumentReader reader = DocumentReader.defaultReader();
+ private String topicName;
+ private String dbHistoryName;
+ private DefaultMQProducer producer;
+ private RocketMqConfig rocketMqConfig;
+ private int maxRecoveryAttempts;
+ private Long pollInterval;
+ private Long sendingTimeout;
+
+ private static Field.Validator forRocketMq(final Field.Validator validator) {
+ return (config, field, problems) -> {
+ final String history = config.getString(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY);
+ return RocketMqSchemaHistory.class.getName().equals(history) ? validator.validate(config, field, problems) : 0;
+ };
+ }
+
+ @Override
+ public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
+ super.configure(config, comparator, listener, useCatalogBeforeSchema);
+ this.topicName = config.getString(TOPIC);
+ this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());
+ this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
+ this.pollInterval = config.getLong(RECOVERY_POLL_INTERVAL_MS);
+ this.sendingTimeout = config.getLong(STORE_RECORD_TIMEOUT_MS);
+ LOGGER.info("Configure to store the debezium database history {} to rocketmq topic {}",
+ dbHistoryName, topicName);
+ // Check acl config
+ boolean aclEnabled = config.getBoolean(RocketMqSchemaHistory.ROCKETMQ_ACL_ENABLE);
+ String accessKey = config.getString(RocketMqSchemaHistory.ROCKETMQ_ACCESS_KEY);
+ String secretKey = config.getString(RocketMqSchemaHistory.ROCKETMQ_SECRET_KEY);
+ if (aclEnabled && (accessKey == null || secretKey == null)) {
+ throw new SchemaHistoryException(
+ "if " + ROCKETMQ_ACL_ENABLE + " true,the configuration " + ROCKETMQ_ACCESS_KEY + " and " + ROCKETMQ_SECRET_KEY + " cannot be empty");
+ }
+
+ // build config
+ this.rocketMqConfig = RocketMqConfig.newBuilder()
+ .aclEnable(config.getBoolean(RocketMqSchemaHistory.ROCKETMQ_ACL_ENABLE))
+ .accessKey(config.getString(RocketMqSchemaHistory.ROCKETMQ_ACCESS_KEY))
+ .secretKey(config.getString(RocketMqSchemaHistory.ROCKETMQ_SECRET_KEY))
+ .namesrvAddr(config.getString(RocketMqSchemaHistory.NAME_SRV_ADDR))
+ .groupId(dbHistoryName)
+ .build();
+ }
+
+ @Override
+ public void initializeStorage() {
+ super.initializeStorage();
+ LOGGER.info("try to create history topic: {}!", this.topicName);
+ TopicConfig topicConfig = new TopicConfig(this.topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE);
+ RocketMqAdminUtil.createTopic(rocketMqConfig, topicConfig);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ try {
+ // Check and create group
+ Set consumerGroupSet = RocketMqAdminUtil.fetchAllConsumerGroup(rocketMqConfig);
+ if (!consumerGroupSet.contains(rocketMqConfig.getGroupId())) {
+ RocketMqAdminUtil.createGroup(rocketMqConfig, rocketMqConfig.getGroupId());
+ }
+ // Start rocketmq producer
+ this.producer = RocketMqAdminUtil.initDefaultMqProducer(rocketMqConfig);
+ this.producer.start();
+ }
+ catch (MQClientException e) {
+ throw new SchemaHistoryException(e);
+ }
+ }
+
+ @Override
+ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
+ if (this.producer == null) {
+ throw new IllegalStateException("No producer is available. Ensure that 'initializeStorage()'"
+ + " is called before storing database schema history records.");
+ }
+ LOGGER.trace("Storing record into database schema history: {}", record);
+ try {
+ Message message = new Message(this.topicName, record.toString().getBytes());
+ // Send messages synchronously until successful
+ // Only sending success or exception will be returned
+ SendResult sendResult = producer.send(message, new ZeroMessageQueueSelector(), null, this.sendingTimeout);
+ switch (sendResult.getSendStatus()) {
+ case SEND_OK:
+ LOGGER.debug("Stored record in topic '{}' partition {} at offset {} ",
+ message.getTopic(), sendResult.getMessageQueue(), sendResult.getMessageQueue());
+ break;
+ default:
+ LOGGER.warn("Stored record in topic '{}' partition {} at offset {}, send status {}",
+ message.getTopic(), sendResult.getMessageQueue(), sendResult.getMessageQueue(),
+ sendResult.getSendStatus());
+ }
+
+ }
+ catch (InterruptedException e) {
+ LOGGER.error("Interrupted before record was written into database schema history: {}", record);
+ Thread.currentThread().interrupt();
+ throw new SchemaHistoryException(e);
+ }
+ catch (MQClientException | RemotingException | MQBrokerException e) {
+ throw new SchemaHistoryException(e);
+ }
+ }
+
+ @Override
+ protected void recoverRecords(Consumer records) {
+ DefaultLitePullConsumer consumer = null;
+ try {
+ consumer = RocketMqAdminUtil.initDefaultLitePullConsumer(rocketMqConfig, false);
+ consumer.start();
+
+ // Select message queue
+ MessageQueue messageQueue = new ZeroMessageQueueSelector().select(new ArrayList<>(consumer.fetchMessageQueues(topicName)), null, null);
+ consumer.assign(Collections.singleton(messageQueue));
+ consumer.seekToBegin(messageQueue);
+ // Read all messages in the topic ...
+ long lastProcessedOffset = -1;
+ Long maxOffset = null;
+ int recoveryAttempts = 0;
+
+ do {
+ if (recoveryAttempts > maxRecoveryAttempts) {
+ throw new IllegalStateException(
+ "The database schema history couldn't be recovered.");
+ }
+ // Get db schema history topic end offset
+ maxOffset = getMaxOffsetOfSchemaHistoryTopic(maxOffset, messageQueue);
+ LOGGER.debug("End offset of database schema history topic is {}", maxOffset);
+
+ // Poll record from db schema history topic
+ List recoveredRecords = consumer.poll(pollInterval);
+ int numRecordsProcessed = 0;
+
+ for (MessageExt message : recoveredRecords) {
+ if (message.getQueueOffset() > lastProcessedOffset) {
+ HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
+ LOGGER.trace("Recovering database history: {}", recordObj);
+ if (recordObj == null || !recordObj.isValid()) {
+ LOGGER.warn("Skipping invalid database history record '{}'. " +
+ "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
+ recordObj, topicName);
+ }
+ else {
+ records.accept(recordObj);
+ LOGGER.trace("Recovered database history: {}", recordObj);
+ }
+ lastProcessedOffset = message.getQueueOffset();
+ ++numRecordsProcessed;
+ }
+ }
+ if (numRecordsProcessed == 0) {
+ LOGGER.debug("No new records found in the database schema history; will retry");
+ recoveryAttempts++;
+ }
+ else {
+ LOGGER.debug("Processed {} records from database schema history", numRecordsProcessed);
+ }
+
+ } while (lastProcessedOffset < maxOffset - 1);
+
+ }
+ catch (MQClientException | MQBrokerException | IOException | RemotingException | InterruptedException ce) {
+ throw new SchemaHistoryException(ce);
+ }
+ finally {
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+ }
+ }
+
+ private Long getMaxOffsetOfSchemaHistoryTopic(Long previousEndOffset, MessageQueue messageQueue)
+ throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ Map minAndMaxOffsets = RocketMqAdminUtil.offsets(this.rocketMqConfig, topicName);
+ Long maxOffset = minAndMaxOffsets.get(messageQueue).getMaxOffset();
+ if (previousEndOffset != null && !previousEndOffset.equals(maxOffset)) {
+ LOGGER.warn("Detected changed end offset of database schema history topic (previous: "
+ + previousEndOffset + ", current: " + maxOffset
+ + "). Make sure that the same history topic isn't shared by multiple connector instances.");
+ }
+ return maxOffset;
+ }
+
+ @Override
+ public boolean exists() {
+ boolean exists = false;
+ if (this.storageExists()) {
+ Map minAndMaxOffset = RocketMqAdminUtil.offsets(this.rocketMqConfig,
+ topicName);
+ for (MessageQueue messageQueue : minAndMaxOffset.keySet()) {
+ if (MESSAGE_QUEUE == messageQueue.getQueueId()) {
+ exists = minAndMaxOffset.get(messageQueue).getMaxOffset() > minAndMaxOffset.get(messageQueue).getMinOffset();
+ }
+ }
+ }
+ return exists;
+ }
+
+ @Override
+ public boolean storageExists() {
+ // Check whether topic exists
+ return RocketMqAdminUtil.topicExist(rocketMqConfig, this.topicName);
+ }
+}
diff --git a/debezium-storage/debezium-storage-rocketmq/src/test/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelectorTest.java b/debezium-storage/debezium-storage-rocketmq/src/test/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelectorTest.java
new file mode 100644
index 000000000..fd0618188
--- /dev/null
+++ b/debezium-storage/debezium-storage-rocketmq/src/test/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelectorTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.storage.rocketmq;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ZeroMessageQueueSelectorTest {
+
+ private static List messageQueues;
+
+ @Before
+ public void before() {
+ messageQueues = new ArrayList<>();
+ messageQueues.add(new MessageQueue("topic", "brokerName", 0));
+ messageQueues.add(new MessageQueue("topic", "brokerName", 1));
+ }
+
+ @Test
+ public void testSelectQueue() {
+ MessageQueue messageQueue = new ZeroMessageQueueSelector().select(messageQueues, null, null);
+ assertEquals(0, messageQueue.getQueueId());
+ }
+}
diff --git a/debezium-storage/pom.xml b/debezium-storage/pom.xml
index 93b0feb40..d6a07cee8 100644
--- a/debezium-storage/pom.xml
+++ b/debezium-storage/pom.xml
@@ -22,5 +22,6 @@
debezium-storage-redis
debezium-storage-s3
debezium-storage-tests
+ debezium-storage-rocketmq
diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc
index f21a570cf..3ed672b10 100644
--- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc
+++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc
@@ -254,6 +254,7 @@ There are also other options available
* `io.debezium.storage.file.history.FileSchemaHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemorySchemaHistory` volatile store for test environments
* `io.debezium.storage.redis.history.RedisSchemaHistory` for Redis deploments
+* `io.debezium.storage.redis.history.RocketMqSchemaHistory` for RocketMQ deploments
|[[debezium-source-database-history-file-filename]]<>
|
@@ -312,6 +313,43 @@ For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|`1000`
|If using Redis to store schema history, defines the delay of retry on wait for replica failure.
+
+|[[schema-history-internal-rocketmq-topic]]<>
+|
+|The name of the rocketmq topic for the database schema history.
+
+|[[schema-history-internal-rocketmq-namesrvAddr]]<>
+|`localhost:9876`
+|RocketMq service discovery NameServer address configuration.
+
+|[[schema-history-internal-rocketmq-acl-enabled]]<>
+|`false`
+|RocketMq access control enable configuration, default is 'false'.
+
+|[[schema-history-internal-rocketmq-access-key]]<>
+|
+|RocketMq access key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be
+empty.
+
+|[[schema-history-internal-rocketmq-secret-key]]<>
+|
+|RocketMq secret key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be
+empty.
+
+|[[schema-history-internal-rocketmq-recovery-attempts]]<>
+| `60`
+|The maximum number of attempts to recover database schema history.
+
+|[[schema-history-internal-rocketmq-recovery-poll-interval-ms]]<>
+| `1000`
+|The number of milliseconds to wait while polling for persisted
+data during recovery.
+
+|[[schema-history-internal-rocketmq-store-record-timeout-ms]]<>
+| `60000`
+|Timeout for sending messages to RocketMq.
+
+
|===
[id="debezium-format-configuration-options"]
diff --git a/pom.xml b/pom.xml
index a042371c3..dbd1a2c91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,9 @@
14.0.4.Final
4.5.0.Final
+
+
+ 4.9.4