DBZ-5997 Debezium Storage add support for Apache RocketMQ

This commit is contained in:
sunxiaojian 2023-01-12 18:04:48 +08:00
parent b34121c128
commit 64558ea56a
11 changed files with 894 additions and 3 deletions

View File

@ -37,10 +37,11 @@
<version.graalvm.js>20.0.0</version.graalvm.js>
<!-- Storages -->
<!-- Redis -->
<!-- Redis -->
<version.jedis>4.1.1</version.jedis>
<!-- S3 -->
<!-- S3 -->
<version.s3>2.17.241</version.s3>
<!-- Testing -->
@ -434,6 +435,17 @@
<artifactId>jedis</artifactId>
<version>${version.jedis}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${version.rocketmq}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${version.rocketmq}</version>
</dependency>
<!-- S3 Storage -->
<dependency>

View File

@ -19,7 +19,6 @@
<version.pravega>0.9.1</version.pravega>
<version.nats>2.16.3</version.nats>
<version.stan>2.2.3</version.stan>
<version.rocketmq>4.9.4</version.rocketmq>
<version.commons.logging>1.2</version.commons.logging>
</properties>

View File

@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-rocketmq</artifactId>
<name>Debezium Storage RocketMq Module</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
</dependency>
</dependencies>
<!--
Define several useful profiles
-->
<profiles>
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${version.assembly.plugin}</version>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-assembly-descriptors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptorRefs>
<descriptorRef>${assembly.descriptor}</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,227 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.rocketmq;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
/**
* Tools for creating RocketMq topic and group
*/
public class RocketMqAdminUtil {
public static String createUniqInstance(String prefix) {
return prefix.concat("-").concat(UUID.randomUUID().toString());
}
public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
public static DefaultLitePullConsumer initDefaultLitePullConsumer(RocketMqConfig config,
boolean autoCommit)
throws MQClientException {
DefaultLitePullConsumer consumer = null;
if (Objects.isNull(consumer)) {
if (StringUtils.isBlank(config.getAccessKey()) && StringUtils.isBlank(config.getSecretKey())) {
consumer = new DefaultLitePullConsumer(
config.getGroupId());
}
else {
consumer = new DefaultLitePullConsumer(
config.getGroupId(),
getAclRPCHook(config.getAccessKey(), config.getSecretKey()));
}
}
consumer.setNamesrvAddr(config.getNamesrvAddr());
String uniqueName = createUniqInstance(config.getNamesrvAddr());
consumer.setInstanceName(uniqueName);
consumer.setUnitName(uniqueName);
consumer.setAutoCommit(autoCommit);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
return consumer;
}
public static DefaultMQProducer initDefaultMqProducer(RocketMqConfig connectConfig) {
RPCHook rpcHook = null;
if (connectConfig.isAclEnable()) {
rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
}
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
producer.setProducerGroup(connectConfig.getGroupId());
producer.setLanguage(LanguageCode.JAVA);
return producer;
}
private static DefaultMQAdminExt startMqAdminTool(RocketMqConfig config) throws MQClientException {
DefaultMQAdminExt admin;
if (config.isAclEnable()) {
admin = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(config.getAccessKey(), config.getSecretKey())));
}
else {
admin = new DefaultMQAdminExt();
}
admin.setNamesrvAddr(config.getNamesrvAddr());
admin.setAdminExtGroup(config.getGroupId());
admin.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
admin.start();
return admin;
}
public static void createTopic(RocketMqConfig config, TopicConfig topicConfig) {
DefaultMQAdminExt defaultMQAdminExt = null;
try {
defaultMQAdminExt = startMqAdminTool(config);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
}
}
}
catch (Exception e) {
throw new RuntimeException("RocketMq create schema history topic: " + topicConfig.getTopicName() + " " +
" failed", e);
}
finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();
}
}
}
public static boolean topicExist(RocketMqConfig config, String topic) {
DefaultMQAdminExt defaultMQAdminExt = null;
boolean foundTopicRouteInfo = false;
try {
defaultMQAdminExt = startMqAdminTool(config);
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
if (topicRouteData != null) {
foundTopicRouteInfo = true;
}
}
catch (Exception e) {
if (e instanceof MQClientException) {
if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
foundTopicRouteInfo = false;
}
else {
throw new RuntimeException("Failed to get topic information", e);
}
}
else {
throw new RuntimeException("Failed to get topic information", e);
}
}
finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();
}
}
return foundTopicRouteInfo;
}
public static Set<String> fetchAllConsumerGroup(RocketMqConfig connectConfig) {
Set<String> consumerGroupSet = new HashSet<>();
DefaultMQAdminExt defaultMQAdminExt = null;
try {
defaultMQAdminExt = startMqAdminTool(connectConfig);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
}
}
catch (Exception e) {
throw new RuntimeException("RocketMq admin fetch all topic failed", e);
}
finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();
}
}
return consumerGroupSet;
}
public static String createGroup(RocketMqConfig connectConfig, String group) {
DefaultMQAdminExt defaultMQAdminExt = null;
try {
defaultMQAdminExt = startMqAdminTool(connectConfig);
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(group);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
}
}
}
catch (Exception e) {
throw new RuntimeException("create subGroup: " + group + " failed", e);
}
finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();
}
}
return group;
}
public static Map<MessageQueue, TopicOffset> offsets(RocketMqConfig config, String topic) {
// Get database schema topic min and max offset
DefaultMQAdminExt adminClient = null;
try {
adminClient = RocketMqAdminUtil.startMqAdminTool(config);
TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
return topicStatsTable.getOffsetTable();
}
catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
throw new RuntimeException(e);
}
finally {
if (adminClient != null) {
adminClient.shutdown();
}
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.rocketmq;
import java.util.Objects;
/**
* Configuration for connecting RocketMq
*/
public class RocketMqConfig {
private String namesrvAddr;
private String groupId;
/**
* set acl config
**/
private boolean aclEnable;
private String accessKey;
private String secretKey;
private RocketMqConfig(String rmqConsumerGroup, String namesrvAddr, boolean aclEnable, String accessKey,
String secretKey) {
this.groupId = rmqConsumerGroup;
this.namesrvAddr = namesrvAddr;
this.aclEnable = aclEnable;
this.accessKey = accessKey;
this.secretKey = secretKey;
}
public static Builder newBuilder() {
return new Builder();
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public String getGroupId() {
return groupId;
}
public boolean isAclEnable() {
return aclEnable;
}
public String getAccessKey() {
return accessKey;
}
public String getSecretKey() {
return secretKey;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RocketMqConfig that = (RocketMqConfig) o;
return aclEnable == that.aclEnable && Objects.equals(namesrvAddr, that.namesrvAddr) && Objects.equals(groupId, that.groupId)
&& Objects.equals(accessKey, that.accessKey) && Objects.equals(secretKey, that.secretKey);
}
@Override
public int hashCode() {
return Objects.hash(namesrvAddr, groupId, aclEnable, accessKey, secretKey);
}
@Override
public String toString() {
return "RocketMqConfig{" +
"namesrvAddr='" + namesrvAddr + '\'' +
", groupId='" + groupId + '\'' +
", aclEnable=" + aclEnable +
", accessKey='" + accessKey + '\'' +
", secretKey='" + secretKey + '\'' +
'}';
}
public static class Builder {
private String namesrvAddr;
private String groupId;
/**
* set acl config
**/
private boolean aclEnable;
private String accessKey;
private String secretKey;
public Builder namesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
return this;
}
public Builder groupId(String groupId) {
this.groupId = groupId;
return this;
}
public Builder aclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
return this;
}
public Builder accessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}
public Builder secretKey(String secretKey) {
this.secretKey = secretKey;
return this;
}
public RocketMqConfig build() {
return new RocketMqConfig(groupId, namesrvAddr, aclEnable, accessKey, secretKey);
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.rocketmq;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Always select MessageQueue with queue id 0
*/
public class ZeroMessageQueueSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.stream().filter(messageQueue -> (messageQueue.getQueueId() == 0)).collect(Collectors.toSet()).stream().findFirst().get();
}
}

View File

@ -0,0 +1,323 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.rocketmq.history;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.storage.rocketmq.RocketMqAdminUtil;
import io.debezium.storage.rocketmq.RocketMqConfig;
import io.debezium.storage.rocketmq.ZeroMessageQueueSelector;
@NotThreadSafe
public class RocketMqSchemaHistory extends AbstractSchemaHistory {
public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.topic")
.withDisplayName("Database schema history topic name")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("The name of the topic for the database schema history")
.withValidation(RocketMqSchemaHistory.forRocketMq(Field::isRequired));
/**
* rocketmq name server addr
*/
public static final Field NAME_SRV_ADDR = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.name.srv.addr")
.withDisplayName("NameServer address")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("RocketMq service discovery service nameserver address configuration")
.withValidation(Field::isRequired);
public static final Field ROCKETMQ_ACL_ENABLE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.acl.enabled")
.withDisplayName("Access control list enabled")
.withType(ConfigDef.Type.BOOLEAN)
.withDefault(false)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("RocketMq access control enable configuration, default is 'false'");
public static final Field ROCKETMQ_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.access.key")
.withDisplayName("RocketMq access key")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("RocketMQ access key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty");
public static final Field ROCKETMQ_SECRET_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.secret.key")
.withDisplayName("RocketMq secret key")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("RocketMQ secret key. If " + ROCKETMQ_ACL_ENABLE + " is true, the value cannot be empty");
public static final Field RECOVERY_POLL_ATTEMPTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.recovery.attempts")
.withDisplayName("Max attempts to recovery database schema history")
.withType(ConfigDef.Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("The number of attempts in a row that no data are returned from RocketMQ before recover " +
"completes. "
+ "The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).")
.withDefault(60)
.withValidation(Field::isInteger);
public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ "rocketmq.recovery.poll.interval.ms")
.withDisplayName("Poll interval during database schema history recovery (ms)")
.withType(ConfigDef.Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("The number of milliseconds to wait while polling for persisted data during recovery.")
.withDefault(1000)
.withValidation(Field::isLong);
public static final Field STORE_RECORD_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ "rocketmq.store.record.timeout.ms")
.withDisplayName("Timeout for sending messages to RocketMq")
.withType(ConfigDef.Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Timeout for sending messages to RocketMq.")
.withDefault(60 * 1000)
.withValidation(Field::isLong);
private final static Logger LOGGER = LoggerFactory.getLogger(RocketMqSchemaHistory.class);
private static final Integer MESSAGE_QUEUE = 0;
private final DocumentReader reader = DocumentReader.defaultReader();
private String topicName;
private String dbHistoryName;
private DefaultMQProducer producer;
private RocketMqConfig rocketMqConfig;
private int maxRecoveryAttempts;
private Long pollInterval;
private Long sendingTimeout;
private static Field.Validator forRocketMq(final Field.Validator validator) {
return (config, field, problems) -> {
final String history = config.getString(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY);
return RocketMqSchemaHistory.class.getName().equals(history) ? validator.validate(config, field, problems) : 0;
};
}
@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.topicName = config.getString(TOPIC);
this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());
this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
this.pollInterval = config.getLong(RECOVERY_POLL_INTERVAL_MS);
this.sendingTimeout = config.getLong(STORE_RECORD_TIMEOUT_MS);
LOGGER.info("Configure to store the debezium database history {} to rocketmq topic {}",
dbHistoryName, topicName);
// Check acl config
boolean aclEnabled = config.getBoolean(RocketMqSchemaHistory.ROCKETMQ_ACL_ENABLE);
String accessKey = config.getString(RocketMqSchemaHistory.ROCKETMQ_ACCESS_KEY);
String secretKey = config.getString(RocketMqSchemaHistory.ROCKETMQ_SECRET_KEY);
if (aclEnabled && (accessKey == null || secretKey == null)) {
throw new SchemaHistoryException(
"if " + ROCKETMQ_ACL_ENABLE + " true,the configuration " + ROCKETMQ_ACCESS_KEY + " and " + ROCKETMQ_SECRET_KEY + " cannot be empty");
}
// build config
this.rocketMqConfig = RocketMqConfig.newBuilder()
.aclEnable(config.getBoolean(RocketMqSchemaHistory.ROCKETMQ_ACL_ENABLE))
.accessKey(config.getString(RocketMqSchemaHistory.ROCKETMQ_ACCESS_KEY))
.secretKey(config.getString(RocketMqSchemaHistory.ROCKETMQ_SECRET_KEY))
.namesrvAddr(config.getString(RocketMqSchemaHistory.NAME_SRV_ADDR))
.groupId(dbHistoryName)
.build();
}
@Override
public void initializeStorage() {
super.initializeStorage();
LOGGER.info("try to create history topic: {}!", this.topicName);
TopicConfig topicConfig = new TopicConfig(this.topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE);
RocketMqAdminUtil.createTopic(rocketMqConfig, topicConfig);
}
@Override
public synchronized void start() {
super.start();
try {
// Check and create group
Set<String> consumerGroupSet = RocketMqAdminUtil.fetchAllConsumerGroup(rocketMqConfig);
if (!consumerGroupSet.contains(rocketMqConfig.getGroupId())) {
RocketMqAdminUtil.createGroup(rocketMqConfig, rocketMqConfig.getGroupId());
}
// Start rocketmq producer
this.producer = RocketMqAdminUtil.initDefaultMqProducer(rocketMqConfig);
this.producer.start();
}
catch (MQClientException e) {
throw new SchemaHistoryException(e);
}
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'initializeStorage()'"
+ " is called before storing database schema history records.");
}
LOGGER.trace("Storing record into database schema history: {}", record);
try {
Message message = new Message(this.topicName, record.toString().getBytes());
// Send messages synchronously until successful
// Only sending success or exception will be returned
SendResult sendResult = producer.send(message, new ZeroMessageQueueSelector(), null, this.sendingTimeout);
switch (sendResult.getSendStatus()) {
case SEND_OK:
LOGGER.debug("Stored record in topic '{}' partition {} at offset {} ",
message.getTopic(), sendResult.getMessageQueue(), sendResult.getMessageQueue());
break;
default:
LOGGER.warn("Stored record in topic '{}' partition {} at offset {}, send status {}",
message.getTopic(), sendResult.getMessageQueue(), sendResult.getMessageQueue(),
sendResult.getSendStatus());
}
}
catch (InterruptedException e) {
LOGGER.error("Interrupted before record was written into database schema history: {}", record);
Thread.currentThread().interrupt();
throw new SchemaHistoryException(e);
}
catch (MQClientException | RemotingException | MQBrokerException e) {
throw new SchemaHistoryException(e);
}
}
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
DefaultLitePullConsumer consumer = null;
try {
consumer = RocketMqAdminUtil.initDefaultLitePullConsumer(rocketMqConfig, false);
consumer.start();
// Select message queue
MessageQueue messageQueue = new ZeroMessageQueueSelector().select(new ArrayList<>(consumer.fetchMessageQueues(topicName)), null, null);
consumer.assign(Collections.singleton(messageQueue));
consumer.seekToBegin(messageQueue);
// Read all messages in the topic ...
long lastProcessedOffset = -1;
Long maxOffset = null;
int recoveryAttempts = 0;
do {
if (recoveryAttempts > maxRecoveryAttempts) {
throw new IllegalStateException(
"The database schema history couldn't be recovered.");
}
// Get db schema history topic end offset
maxOffset = getMaxOffsetOfSchemaHistoryTopic(maxOffset, messageQueue);
LOGGER.debug("End offset of database schema history topic is {}", maxOffset);
// Poll record from db schema history topic
List<MessageExt> recoveredRecords = consumer.poll(pollInterval);
int numRecordsProcessed = 0;
for (MessageExt message : recoveredRecords) {
if (message.getQueueOffset() > lastProcessedOffset) {
HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
LOGGER.trace("Recovering database history: {}", recordObj);
if (recordObj == null || !recordObj.isValid()) {
LOGGER.warn("Skipping invalid database history record '{}'. " +
"This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
recordObj, topicName);
}
else {
records.accept(recordObj);
LOGGER.trace("Recovered database history: {}", recordObj);
}
lastProcessedOffset = message.getQueueOffset();
++numRecordsProcessed;
}
}
if (numRecordsProcessed == 0) {
LOGGER.debug("No new records found in the database schema history; will retry");
recoveryAttempts++;
}
else {
LOGGER.debug("Processed {} records from database schema history", numRecordsProcessed);
}
} while (lastProcessedOffset < maxOffset - 1);
}
catch (MQClientException | MQBrokerException | IOException | RemotingException | InterruptedException ce) {
throw new SchemaHistoryException(ce);
}
finally {
if (consumer != null) {
consumer.shutdown();
}
}
}
private Long getMaxOffsetOfSchemaHistoryTopic(Long previousEndOffset, MessageQueue messageQueue)
throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
Map<MessageQueue, TopicOffset> minAndMaxOffsets = RocketMqAdminUtil.offsets(this.rocketMqConfig, topicName);
Long maxOffset = minAndMaxOffsets.get(messageQueue).getMaxOffset();
if (previousEndOffset != null && !previousEndOffset.equals(maxOffset)) {
LOGGER.warn("Detected changed end offset of database schema history topic (previous: "
+ previousEndOffset + ", current: " + maxOffset
+ "). Make sure that the same history topic isn't shared by multiple connector instances.");
}
return maxOffset;
}
@Override
public boolean exists() {
boolean exists = false;
if (this.storageExists()) {
Map<MessageQueue, TopicOffset> minAndMaxOffset = RocketMqAdminUtil.offsets(this.rocketMqConfig,
topicName);
for (MessageQueue messageQueue : minAndMaxOffset.keySet()) {
if (MESSAGE_QUEUE == messageQueue.getQueueId()) {
exists = minAndMaxOffset.get(messageQueue).getMaxOffset() > minAndMaxOffset.get(messageQueue).getMinOffset();
}
}
}
return exists;
}
@Override
public boolean storageExists() {
// Check whether topic exists
return RocketMqAdminUtil.topicExist(rocketMqConfig, this.topicName);
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.rocketmq;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Before;
import org.junit.Test;
public class ZeroMessageQueueSelectorTest {
private static List<MessageQueue> messageQueues;
@Before
public void before() {
messageQueues = new ArrayList<>();
messageQueues.add(new MessageQueue("topic", "brokerName", 0));
messageQueues.add(new MessageQueue("topic", "brokerName", 1));
}
@Test
public void testSelectQueue() {
MessageQueue messageQueue = new ZeroMessageQueueSelector().select(messageQueues, null, null);
assertEquals(0, messageQueue.getQueueId());
}
}

View File

@ -22,5 +22,6 @@
<module>debezium-storage-redis</module>
<module>debezium-storage-s3</module>
<module>debezium-storage-tests</module>
<module>debezium-storage-rocketmq</module>
</modules>
</project>

View File

@ -254,6 +254,7 @@ There are also other options available
* `io.debezium.storage.file.history.FileSchemaHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemorySchemaHistory` volatile store for test environments
* `io.debezium.storage.redis.history.RedisSchemaHistory` for Redis deploments
* `io.debezium.storage.redis.history.RocketMqSchemaHistory` for RocketMQ deploments
|[[debezium-source-database-history-file-filename]]<<debezium-source-database-history-file-filename, `debezium.source.schema.history.internal.file.filename`>>
|
@ -312,6 +313,43 @@ For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|`1000`
|If using Redis to store schema history, defines the delay of retry on wait for replica failure.
|[[schema-history-internal-rocketmq-topic]]<<schema-history-internal-rocketmq-topic, `debezium.source.schema.history.internal.rocketmq.topic`>>
|
|The name of the rocketmq topic for the database schema history.
|[[schema-history-internal-rocketmq-namesrvAddr]]<<schema-history-internal-rocketmq-namesrvAddr, `debezium.source.schema.history.internal.rocketmq.name.srv.addr`>>
|`localhost:9876`
|RocketMq service discovery NameServer address configuration.
|[[schema-history-internal-rocketmq-acl-enabled]]<<schema-history-internal-rocketmq-acl-enabled, `debezium.source.schema.history.internal.rocketmq.acl.enabled`>>
|`false`
|RocketMq access control enable configuration, default is 'false'.
|[[schema-history-internal-rocketmq-access-key]]<<schema-history-internal-rocketmq-access-key, `debezium.source.schema.history.internal.rocketmq.access.key`>>
|
|RocketMq access key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be
empty.
|[[schema-history-internal-rocketmq-secret-key]]<<schema-history-internal-rocketmq-secret-key, `debezium.source.schema.history.internal.rocketmq.secret.key`>>
|
|RocketMq secret key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be
empty.
|[[schema-history-internal-rocketmq-recovery-attempts]]<<schema-history-internal-rocketmq-recovery-attempts,`debezium.source.schema.history.internal.rocketmq.recovery.attempts`>>
| `60`
|The maximum number of attempts to recover database schema history.
|[[schema-history-internal-rocketmq-recovery-poll-interval-ms]]<<schema-history-internal-rocketmq-recovery-poll-interval-ms,`debezium.source.schema.history.internal.rocketmq.recovery.poll.interval.ms`>>
| `1000`
|The number of milliseconds to wait while polling for persisted
data during recovery.
|[[schema-history-internal-rocketmq-store-record-timeout-ms]]<<schema-history-internal-rocketmq-store-record-timeout-ms,`debezium.source.schema.history.internal.rocketmq.store.record.timeout.ms`>>
| `60000`
|Timeout for sending messages to RocketMq.
|===
[id="debezium-format-configuration-options"]

View File

@ -139,6 +139,9 @@
<!-- Infinispan version for Oracle and Debezium Server sink -->
<version.infinispan>14.0.4.Final</version.infinispan>
<version.infinispan.protostream>4.5.0.Final</version.infinispan.protostream>
<!---->
<version.rocketmq>4.9.4</version.rocketmq>
</properties>
<modules>