From 64558ea56afc73ed547d65b40ccd4a2c88379c3c Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 12 Jan 2023 18:04:48 +0800 Subject: [PATCH] DBZ-5997 Debezium Storage add support for Apache RocketMQ --- debezium-bom/pom.xml | 16 +- debezium-server/debezium-server-bom/pom.xml | 1 - .../debezium-storage-rocketmq/pom.xml | 105 ++++++ .../storage/rocketmq/RocketMqAdminUtil.java | 227 ++++++++++++ .../storage/rocketmq/RocketMqConfig.java | 127 +++++++ .../rocketmq/ZeroMessageQueueSelector.java | 23 ++ .../history/RocketMqSchemaHistory.java | 323 ++++++++++++++++++ .../ZeroMessageQueueSelectorTest.java | 33 ++ debezium-storage/pom.xml | 1 + .../pages/operations/debezium-server.adoc | 38 +++ pom.xml | 3 + 11 files changed, 894 insertions(+), 3 deletions(-) create mode 100644 debezium-storage/debezium-storage-rocketmq/pom.xml create mode 100644 debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqAdminUtil.java create mode 100644 debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqConfig.java create mode 100644 debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelector.java create mode 100644 debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java create mode 100644 debezium-storage/debezium-storage-rocketmq/src/test/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelectorTest.java diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml index 80bce464e..e5a16fb33 100644 --- a/debezium-bom/pom.xml +++ b/debezium-bom/pom.xml @@ -37,10 +37,11 @@ 20.0.0 - + + 4.1.1 - + 2.17.241 @@ -434,6 +435,17 @@ jedis ${version.jedis} + + org.apache.rocketmq + rocketmq-client + ${version.rocketmq} + + + + org.apache.rocketmq + rocketmq-tools + ${version.rocketmq} + diff --git a/debezium-server/debezium-server-bom/pom.xml b/debezium-server/debezium-server-bom/pom.xml index 5b4ecfadc..412a93aa4 100644 --- a/debezium-server/debezium-server-bom/pom.xml +++ b/debezium-server/debezium-server-bom/pom.xml @@ -19,7 +19,6 @@ 0.9.1 2.16.3 2.2.3 - 4.9.4 1.2 diff --git a/debezium-storage/debezium-storage-rocketmq/pom.xml b/debezium-storage/debezium-storage-rocketmq/pom.xml new file mode 100644 index 000000000..ce979d6b9 --- /dev/null +++ b/debezium-storage/debezium-storage-rocketmq/pom.xml @@ -0,0 +1,105 @@ + + + + io.debezium + debezium-storage + 2.2.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + debezium-storage-rocketmq + Debezium Storage RocketMq Module + jar + + + + io.debezium + debezium-api + + + io.debezium + debezium-core + + + org.slf4j + slf4j-api + provided + + + org.apache.kafka + connect-api + provided + + + + org.apache.rocketmq + rocketmq-client + + + + org.apache.rocketmq + rocketmq-tools + + + + + junit + junit + test + + + org.apache.kafka + connect-runtime + + + + + + + + assembly + + false + + + + + org.apache.maven.plugins + maven-assembly-plugin + ${version.assembly.plugin} + + + io.debezium + debezium-assembly-descriptors + ${project.version} + + + + + default + package + + single + + + ${project.artifactId}-${project.version} + true + + ${assembly.descriptor} + + posix + + + + + + + + + + \ No newline at end of file diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqAdminUtil.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqAdminUtil.java new file mode 100644 index 000000000..2c5549c98 --- /dev/null +++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqAdminUtil.java @@ -0,0 +1,227 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.rocketmq; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; + +/** + * Tools for creating RocketMq topic and group + */ +public class RocketMqAdminUtil { + + public static String createUniqInstance(String prefix) { + return prefix.concat("-").concat(UUID.randomUUID().toString()); + } + + public static RPCHook getAclRPCHook(String accessKey, String secretKey) { + return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + } + + public static DefaultLitePullConsumer initDefaultLitePullConsumer(RocketMqConfig config, + boolean autoCommit) + throws MQClientException { + DefaultLitePullConsumer consumer = null; + if (Objects.isNull(consumer)) { + if (StringUtils.isBlank(config.getAccessKey()) && StringUtils.isBlank(config.getSecretKey())) { + consumer = new DefaultLitePullConsumer( + config.getGroupId()); + } + else { + consumer = new DefaultLitePullConsumer( + config.getGroupId(), + getAclRPCHook(config.getAccessKey(), config.getSecretKey())); + } + } + consumer.setNamesrvAddr(config.getNamesrvAddr()); + String uniqueName = createUniqInstance(config.getNamesrvAddr()); + consumer.setInstanceName(uniqueName); + consumer.setUnitName(uniqueName); + consumer.setAutoCommit(autoCommit); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + return consumer; + } + + public static DefaultMQProducer initDefaultMqProducer(RocketMqConfig connectConfig) { + RPCHook rpcHook = null; + if (connectConfig.isAclEnable()) { + rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())); + } + DefaultMQProducer producer = new DefaultMQProducer(rpcHook); + producer.setNamesrvAddr(connectConfig.getNamesrvAddr()); + producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr())); + producer.setProducerGroup(connectConfig.getGroupId()); + producer.setLanguage(LanguageCode.JAVA); + return producer; + } + + private static DefaultMQAdminExt startMqAdminTool(RocketMqConfig config) throws MQClientException { + DefaultMQAdminExt admin; + if (config.isAclEnable()) { + admin = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(config.getAccessKey(), config.getSecretKey()))); + } + else { + admin = new DefaultMQAdminExt(); + } + admin.setNamesrvAddr(config.getNamesrvAddr()); + admin.setAdminExtGroup(config.getGroupId()); + admin.setInstanceName(createUniqInstance(config.getNamesrvAddr())); + admin.start(); + return admin; + } + + public static void createTopic(RocketMqConfig config, TopicConfig topicConfig) { + DefaultMQAdminExt defaultMQAdminExt = null; + try { + defaultMQAdminExt = startMqAdminTool(config); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Set clusterNameSet = clusterAddrTable.keySet(); + for (String clusterName : clusterNameSet) { + Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + } + } + } + catch (Exception e) { + throw new RuntimeException("RocketMq create schema history topic: " + topicConfig.getTopicName() + " " + + " failed", e); + } + finally { + if (defaultMQAdminExt != null) { + defaultMQAdminExt.shutdown(); + } + } + } + + public static boolean topicExist(RocketMqConfig config, String topic) { + DefaultMQAdminExt defaultMQAdminExt = null; + boolean foundTopicRouteInfo = false; + try { + defaultMQAdminExt = startMqAdminTool(config); + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + if (topicRouteData != null) { + foundTopicRouteInfo = true; + } + } + catch (Exception e) { + if (e instanceof MQClientException) { + if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) { + foundTopicRouteInfo = false; + } + else { + throw new RuntimeException("Failed to get topic information", e); + } + } + else { + throw new RuntimeException("Failed to get topic information", e); + } + } + finally { + if (defaultMQAdminExt != null) { + defaultMQAdminExt.shutdown(); + } + } + return foundTopicRouteInfo; + } + + public static Set fetchAllConsumerGroup(RocketMqConfig connectConfig) { + Set consumerGroupSet = new HashSet<>(); + DefaultMQAdminExt defaultMQAdminExt = null; + try { + defaultMQAdminExt = startMqAdminTool(connectConfig); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); + consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); + } + } + catch (Exception e) { + throw new RuntimeException("RocketMq admin fetch all topic failed", e); + } + finally { + if (defaultMQAdminExt != null) { + defaultMQAdminExt.shutdown(); + } + } + return consumerGroupSet; + } + + public static String createGroup(RocketMqConfig connectConfig, String group) { + DefaultMQAdminExt defaultMQAdminExt = null; + try { + defaultMQAdminExt = startMqAdminTool(connectConfig); + SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig(); + initConfig.setGroupName(group); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Set clusterNameSet = clusterAddrTable.keySet(); + for (String clusterName : clusterNameSet) { + Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : masterSet) { + defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig); + } + } + } + catch (Exception e) { + throw new RuntimeException("create subGroup: " + group + " failed", e); + } + finally { + if (defaultMQAdminExt != null) { + defaultMQAdminExt.shutdown(); + } + } + return group; + } + + public static Map offsets(RocketMqConfig config, String topic) { + // Get database schema topic min and max offset + DefaultMQAdminExt adminClient = null; + try { + adminClient = RocketMqAdminUtil.startMqAdminTool(config); + TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic); + return topicStatsTable.getOffsetTable(); + } + catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) { + throw new RuntimeException(e); + } + finally { + if (adminClient != null) { + adminClient.shutdown(); + } + } + } + +} diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqConfig.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqConfig.java new file mode 100644 index 000000000..d6ce83c2a --- /dev/null +++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/RocketMqConfig.java @@ -0,0 +1,127 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.storage.rocketmq; + +import java.util.Objects; + +/** + * Configuration for connecting RocketMq + */ +public class RocketMqConfig { + private String namesrvAddr; + + private String groupId; + + /** + * set acl config + **/ + private boolean aclEnable; + private String accessKey; + private String secretKey; + + private RocketMqConfig(String rmqConsumerGroup, String namesrvAddr, boolean aclEnable, String accessKey, + String secretKey) { + this.groupId = rmqConsumerGroup; + this.namesrvAddr = namesrvAddr; + this.aclEnable = aclEnable; + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public String getGroupId() { + return groupId; + } + + public boolean isAclEnable() { + return aclEnable; + } + + public String getAccessKey() { + return accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RocketMqConfig that = (RocketMqConfig) o; + return aclEnable == that.aclEnable && Objects.equals(namesrvAddr, that.namesrvAddr) && Objects.equals(groupId, that.groupId) + && Objects.equals(accessKey, that.accessKey) && Objects.equals(secretKey, that.secretKey); + } + + @Override + public int hashCode() { + return Objects.hash(namesrvAddr, groupId, aclEnable, accessKey, secretKey); + } + + @Override + public String toString() { + return "RocketMqConfig{" + + "namesrvAddr='" + namesrvAddr + '\'' + + ", groupId='" + groupId + '\'' + + ", aclEnable=" + aclEnable + + ", accessKey='" + accessKey + '\'' + + ", secretKey='" + secretKey + '\'' + + '}'; + } + + public static class Builder { + private String namesrvAddr; + private String groupId; + /** + * set acl config + **/ + private boolean aclEnable; + private String accessKey; + private String secretKey; + + public Builder namesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + return this; + } + + public Builder groupId(String groupId) { + this.groupId = groupId; + return this; + } + + public Builder aclEnable(boolean aclEnable) { + this.aclEnable = aclEnable; + return this; + } + + public Builder accessKey(String accessKey) { + this.accessKey = accessKey; + return this; + } + + public Builder secretKey(String secretKey) { + this.secretKey = secretKey; + return this; + } + + public RocketMqConfig build() { + return new RocketMqConfig(groupId, namesrvAddr, aclEnable, accessKey, secretKey); + } + } +} diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelector.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelector.java new file mode 100644 index 000000000..a5e4823e7 --- /dev/null +++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelector.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.rocketmq; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * Always select MessageQueue with queue id 0 + */ +public class ZeroMessageQueueSelector implements MessageQueueSelector { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return mqs.stream().filter(messageQueue -> (messageQueue.getQueueId() == 0)).collect(Collectors.toSet()).stream().findFirst().get(); + } +} diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java new file mode 100644 index 000000000..109603a18 --- /dev/null +++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java @@ -0,0 +1,323 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.rocketmq.history; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.TopicOffset; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.NotThreadSafe; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.DocumentReader; +import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; +import io.debezium.relational.history.AbstractSchemaHistory; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryException; +import io.debezium.relational.history.SchemaHistoryListener; +import io.debezium.storage.rocketmq.RocketMqAdminUtil; +import io.debezium.storage.rocketmq.RocketMqConfig; +import io.debezium.storage.rocketmq.ZeroMessageQueueSelector; + +@NotThreadSafe +public class RocketMqSchemaHistory extends AbstractSchemaHistory { + public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.topic") + .withDisplayName("Database schema history topic name") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.LONG) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription("The name of the topic for the database schema history") + .withValidation(RocketMqSchemaHistory.forRocketMq(Field::isRequired)); + /** + * rocketmq name server addr + */ + public static final Field NAME_SRV_ADDR = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.name.srv.addr") + .withDisplayName("NameServer address") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.LONG) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription("RocketMq service discovery service nameserver address configuration") + .withValidation(Field::isRequired); + public static final Field ROCKETMQ_ACL_ENABLE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.acl.enabled") + .withDisplayName("Access control list enabled") + .withType(ConfigDef.Type.BOOLEAN) + .withDefault(false) + .withWidth(ConfigDef.Width.LONG) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription("RocketMq access control enable configuration, default is 'false'"); + public static final Field ROCKETMQ_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.access.key") + .withDisplayName("RocketMq access key") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.LONG) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription("RocketMQ access key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty"); + public static final Field ROCKETMQ_SECRET_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.secret.key") + .withDisplayName("RocketMq secret key") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.LONG) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription("RocketMQ secret key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty"); + public static final Field RECOVERY_POLL_ATTEMPTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.recovery.attempts") + .withDisplayName("Max attempts to recovery database schema history") + .withType(ConfigDef.Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0)) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.LOW) + .withDescription("The number of attempts in a row that no data are returned from RocketMQ before recover " + + "completes. " + + "The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).") + .withDefault(60) + .withValidation(Field::isInteger); + public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + + "rocketmq.recovery.poll.interval.ms") + .withDisplayName("Poll interval during database schema history recovery (ms)") + .withType(ConfigDef.Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1)) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.LOW) + .withDescription("The number of milliseconds to wait while polling for persisted data during recovery.") + .withDefault(1000) + .withValidation(Field::isLong); + public static final Field STORE_RECORD_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + + "rocketmq.store.record.timeout.ms") + .withDisplayName("Timeout for sending messages to RocketMq") + .withType(ConfigDef.Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1)) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.LOW) + .withDescription("Timeout for sending messages to RocketMq.") + .withDefault(60 * 1000) + .withValidation(Field::isLong); + private final static Logger LOGGER = LoggerFactory.getLogger(RocketMqSchemaHistory.class); + private static final Integer MESSAGE_QUEUE = 0; + private final DocumentReader reader = DocumentReader.defaultReader(); + private String topicName; + private String dbHistoryName; + private DefaultMQProducer producer; + private RocketMqConfig rocketMqConfig; + private int maxRecoveryAttempts; + private Long pollInterval; + private Long sendingTimeout; + + private static Field.Validator forRocketMq(final Field.Validator validator) { + return (config, field, problems) -> { + final String history = config.getString(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY); + return RocketMqSchemaHistory.class.getName().equals(history) ? validator.validate(config, field, problems) : 0; + }; + } + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + super.configure(config, comparator, listener, useCatalogBeforeSchema); + this.topicName = config.getString(TOPIC); + this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); + this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS); + this.pollInterval = config.getLong(RECOVERY_POLL_INTERVAL_MS); + this.sendingTimeout = config.getLong(STORE_RECORD_TIMEOUT_MS); + LOGGER.info("Configure to store the debezium database history {} to rocketmq topic {}", + dbHistoryName, topicName); + // Check acl config + boolean aclEnabled = config.getBoolean(RocketMqSchemaHistory.ROCKETMQ_ACL_ENABLE); + String accessKey = config.getString(RocketMqSchemaHistory.ROCKETMQ_ACCESS_KEY); + String secretKey = config.getString(RocketMqSchemaHistory.ROCKETMQ_SECRET_KEY); + if (aclEnabled && (accessKey == null || secretKey == null)) { + throw new SchemaHistoryException( + "if " + ROCKETMQ_ACL_ENABLE + " true,the configuration " + ROCKETMQ_ACCESS_KEY + " and " + ROCKETMQ_SECRET_KEY + " cannot be empty"); + } + + // build config + this.rocketMqConfig = RocketMqConfig.newBuilder() + .aclEnable(config.getBoolean(RocketMqSchemaHistory.ROCKETMQ_ACL_ENABLE)) + .accessKey(config.getString(RocketMqSchemaHistory.ROCKETMQ_ACCESS_KEY)) + .secretKey(config.getString(RocketMqSchemaHistory.ROCKETMQ_SECRET_KEY)) + .namesrvAddr(config.getString(RocketMqSchemaHistory.NAME_SRV_ADDR)) + .groupId(dbHistoryName) + .build(); + } + + @Override + public void initializeStorage() { + super.initializeStorage(); + LOGGER.info("try to create history topic: {}!", this.topicName); + TopicConfig topicConfig = new TopicConfig(this.topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE); + RocketMqAdminUtil.createTopic(rocketMqConfig, topicConfig); + } + + @Override + public synchronized void start() { + super.start(); + try { + // Check and create group + Set consumerGroupSet = RocketMqAdminUtil.fetchAllConsumerGroup(rocketMqConfig); + if (!consumerGroupSet.contains(rocketMqConfig.getGroupId())) { + RocketMqAdminUtil.createGroup(rocketMqConfig, rocketMqConfig.getGroupId()); + } + // Start rocketmq producer + this.producer = RocketMqAdminUtil.initDefaultMqProducer(rocketMqConfig); + this.producer.start(); + } + catch (MQClientException e) { + throw new SchemaHistoryException(e); + } + } + + @Override + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + if (this.producer == null) { + throw new IllegalStateException("No producer is available. Ensure that 'initializeStorage()'" + + " is called before storing database schema history records."); + } + LOGGER.trace("Storing record into database schema history: {}", record); + try { + Message message = new Message(this.topicName, record.toString().getBytes()); + // Send messages synchronously until successful + // Only sending success or exception will be returned + SendResult sendResult = producer.send(message, new ZeroMessageQueueSelector(), null, this.sendingTimeout); + switch (sendResult.getSendStatus()) { + case SEND_OK: + LOGGER.debug("Stored record in topic '{}' partition {} at offset {} ", + message.getTopic(), sendResult.getMessageQueue(), sendResult.getMessageQueue()); + break; + default: + LOGGER.warn("Stored record in topic '{}' partition {} at offset {}, send status {}", + message.getTopic(), sendResult.getMessageQueue(), sendResult.getMessageQueue(), + sendResult.getSendStatus()); + } + + } + catch (InterruptedException e) { + LOGGER.error("Interrupted before record was written into database schema history: {}", record); + Thread.currentThread().interrupt(); + throw new SchemaHistoryException(e); + } + catch (MQClientException | RemotingException | MQBrokerException e) { + throw new SchemaHistoryException(e); + } + } + + @Override + protected void recoverRecords(Consumer records) { + DefaultLitePullConsumer consumer = null; + try { + consumer = RocketMqAdminUtil.initDefaultLitePullConsumer(rocketMqConfig, false); + consumer.start(); + + // Select message queue + MessageQueue messageQueue = new ZeroMessageQueueSelector().select(new ArrayList<>(consumer.fetchMessageQueues(topicName)), null, null); + consumer.assign(Collections.singleton(messageQueue)); + consumer.seekToBegin(messageQueue); + // Read all messages in the topic ... + long lastProcessedOffset = -1; + Long maxOffset = null; + int recoveryAttempts = 0; + + do { + if (recoveryAttempts > maxRecoveryAttempts) { + throw new IllegalStateException( + "The database schema history couldn't be recovered."); + } + // Get db schema history topic end offset + maxOffset = getMaxOffsetOfSchemaHistoryTopic(maxOffset, messageQueue); + LOGGER.debug("End offset of database schema history topic is {}", maxOffset); + + // Poll record from db schema history topic + List recoveredRecords = consumer.poll(pollInterval); + int numRecordsProcessed = 0; + + for (MessageExt message : recoveredRecords) { + if (message.getQueueOffset() > lastProcessedOffset) { + HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody())); + LOGGER.trace("Recovering database history: {}", recordObj); + if (recordObj == null || !recordObj.isValid()) { + LOGGER.warn("Skipping invalid database history record '{}'. " + + "This is often not an issue, but if it happens repeatedly please check the '{}' topic.", + recordObj, topicName); + } + else { + records.accept(recordObj); + LOGGER.trace("Recovered database history: {}", recordObj); + } + lastProcessedOffset = message.getQueueOffset(); + ++numRecordsProcessed; + } + } + if (numRecordsProcessed == 0) { + LOGGER.debug("No new records found in the database schema history; will retry"); + recoveryAttempts++; + } + else { + LOGGER.debug("Processed {} records from database schema history", numRecordsProcessed); + } + + } while (lastProcessedOffset < maxOffset - 1); + + } + catch (MQClientException | MQBrokerException | IOException | RemotingException | InterruptedException ce) { + throw new SchemaHistoryException(ce); + } + finally { + if (consumer != null) { + consumer.shutdown(); + } + } + } + + private Long getMaxOffsetOfSchemaHistoryTopic(Long previousEndOffset, MessageQueue messageQueue) + throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + Map minAndMaxOffsets = RocketMqAdminUtil.offsets(this.rocketMqConfig, topicName); + Long maxOffset = minAndMaxOffsets.get(messageQueue).getMaxOffset(); + if (previousEndOffset != null && !previousEndOffset.equals(maxOffset)) { + LOGGER.warn("Detected changed end offset of database schema history topic (previous: " + + previousEndOffset + ", current: " + maxOffset + + "). Make sure that the same history topic isn't shared by multiple connector instances."); + } + return maxOffset; + } + + @Override + public boolean exists() { + boolean exists = false; + if (this.storageExists()) { + Map minAndMaxOffset = RocketMqAdminUtil.offsets(this.rocketMqConfig, + topicName); + for (MessageQueue messageQueue : minAndMaxOffset.keySet()) { + if (MESSAGE_QUEUE == messageQueue.getQueueId()) { + exists = minAndMaxOffset.get(messageQueue).getMaxOffset() > minAndMaxOffset.get(messageQueue).getMinOffset(); + } + } + } + return exists; + } + + @Override + public boolean storageExists() { + // Check whether topic exists + return RocketMqAdminUtil.topicExist(rocketMqConfig, this.topicName); + } +} diff --git a/debezium-storage/debezium-storage-rocketmq/src/test/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelectorTest.java b/debezium-storage/debezium-storage-rocketmq/src/test/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelectorTest.java new file mode 100644 index 000000000..fd0618188 --- /dev/null +++ b/debezium-storage/debezium-storage-rocketmq/src/test/java/io/debezium/storage/rocketmq/ZeroMessageQueueSelectorTest.java @@ -0,0 +1,33 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.rocketmq; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Before; +import org.junit.Test; + +public class ZeroMessageQueueSelectorTest { + + private static List messageQueues; + + @Before + public void before() { + messageQueues = new ArrayList<>(); + messageQueues.add(new MessageQueue("topic", "brokerName", 0)); + messageQueues.add(new MessageQueue("topic", "brokerName", 1)); + } + + @Test + public void testSelectQueue() { + MessageQueue messageQueue = new ZeroMessageQueueSelector().select(messageQueues, null, null); + assertEquals(0, messageQueue.getQueueId()); + } +} diff --git a/debezium-storage/pom.xml b/debezium-storage/pom.xml index 93b0feb40..d6a07cee8 100644 --- a/debezium-storage/pom.xml +++ b/debezium-storage/pom.xml @@ -22,5 +22,6 @@ debezium-storage-redis debezium-storage-s3 debezium-storage-tests + debezium-storage-rocketmq diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc index f21a570cf..3ed672b10 100644 --- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc +++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc @@ -254,6 +254,7 @@ There are also other options available * `io.debezium.storage.file.history.FileSchemaHistory` for non-Kafka deployments * `io.debezium.relational.history.MemorySchemaHistory` volatile store for test environments * `io.debezium.storage.redis.history.RedisSchemaHistory` for Redis deploments +* `io.debezium.storage.redis.history.RocketMqSchemaHistory` for RocketMQ deploments |[[debezium-source-database-history-file-filename]]<> | @@ -312,6 +313,43 @@ For more information see Redis https://redis.io/commands/wait/[WAIT] command. |`1000` |If using Redis to store schema history, defines the delay of retry on wait for replica failure. + +|[[schema-history-internal-rocketmq-topic]]<> +| +|The name of the rocketmq topic for the database schema history. + +|[[schema-history-internal-rocketmq-namesrvAddr]]<> +|`localhost:9876` +|RocketMq service discovery NameServer address configuration. + +|[[schema-history-internal-rocketmq-acl-enabled]]<> +|`false` +|RocketMq access control enable configuration, default is 'false'. + +|[[schema-history-internal-rocketmq-access-key]]<> +| +|RocketMq access key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be +empty. + +|[[schema-history-internal-rocketmq-secret-key]]<> +| +|RocketMq secret key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be +empty. + +|[[schema-history-internal-rocketmq-recovery-attempts]]<> +| `60` +|The maximum number of attempts to recover database schema history. + +|[[schema-history-internal-rocketmq-recovery-poll-interval-ms]]<> +| `1000` +|The number of milliseconds to wait while polling for persisted +data during recovery. + +|[[schema-history-internal-rocketmq-store-record-timeout-ms]]<> +| `60000` +|Timeout for sending messages to RocketMq. + + |=== [id="debezium-format-configuration-options"] diff --git a/pom.xml b/pom.xml index a042371c3..dbd1a2c91 100644 --- a/pom.xml +++ b/pom.xml @@ -139,6 +139,9 @@ 14.0.4.Final 4.5.0.Final + + + 4.9.4