DBZ-4771 squash commits
This commit is contained in:
parent
60b80f6365
commit
fbab769910
@ -16,7 +16,7 @@
|
||||
<version.pubsub>25.0.0</version.pubsub>
|
||||
<version.pulsar>2.5.2</version.pulsar>
|
||||
<version.eventhubs>5.1.1</version.eventhubs>
|
||||
<version.jedis>3.5.2</version.jedis>
|
||||
<version.jedis>4.1.1</version.jedis>
|
||||
<version.pravega>0.9.1</version.pravega>
|
||||
<version.nats>2.8.0</version.nats>
|
||||
<version.stan>2.2.3</version.stan>
|
||||
|
@ -45,6 +45,12 @@
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-connector-mysql</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-server-core</artifactId>
|
||||
@ -71,6 +77,11 @@
|
||||
<artifactId>debezium-connector-postgres</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-connector-mysql</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
@ -116,6 +127,7 @@
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<reuseForks>false</reuseForks>
|
||||
<skipTests>${skipITs}</skipTests>
|
||||
<enableAssertions>true</enableAssertions>
|
||||
<systemProperties>
|
||||
|
@ -0,0 +1,256 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.redis;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Field;
|
||||
import io.debezium.document.DocumentReader;
|
||||
import io.debezium.document.DocumentWriter;
|
||||
import io.debezium.relational.history.AbstractDatabaseHistory;
|
||||
import io.debezium.relational.history.DatabaseHistoryException;
|
||||
import io.debezium.relational.history.DatabaseHistoryListener;
|
||||
import io.debezium.relational.history.HistoryRecord;
|
||||
import io.debezium.relational.history.HistoryRecordComparator;
|
||||
import io.debezium.util.Collect;
|
||||
import io.debezium.util.DelayStrategy;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.StreamEntryID;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.resps.StreamEntry;
|
||||
|
||||
/**
|
||||
* A {@link DatabaseHistory} implementation that stores the schema history in Redis.
|
||||
*
|
||||
*/
|
||||
@ThreadSafe
|
||||
public final class RedisDatabaseHistory extends AbstractDatabaseHistory {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RedisDatabaseHistory.class);
|
||||
|
||||
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "redis.address")
|
||||
.withDescription("The redis url that will be used to access the database history");
|
||||
|
||||
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "redis.user")
|
||||
.withDescription("The redis url that will be used to access the database history");
|
||||
|
||||
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "redis.password")
|
||||
.withDescription("The redis url that will be used to access the database history");
|
||||
|
||||
public static final Field PROP_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "redis.key")
|
||||
.withDescription("The redis key that will be used to store the database history")
|
||||
.withDefault("metadata:debezium:db_history");
|
||||
|
||||
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
|
||||
public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
|
||||
.withDescription("Initial retry delay (in ms)")
|
||||
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
|
||||
|
||||
public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
|
||||
public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
|
||||
.withDescription("Maximum retry delay (in ms)")
|
||||
.withDefault(DEFAULT_RETRY_MAX_DELAY);
|
||||
|
||||
Integer initialRetryDelay;
|
||||
Integer maxRetryDelay;
|
||||
|
||||
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(PROP_ADDRESS, PROP_USER, PROP_PASSWORD, PROP_KEY);
|
||||
|
||||
// used as fallback to consolidate configuration settings in the sink
|
||||
private static final String SINK_PROP_PREFIX = "debezium.sink.redis.";
|
||||
|
||||
private final DocumentWriter writer = DocumentWriter.defaultWriter();
|
||||
private final DocumentReader reader = DocumentReader.defaultReader();
|
||||
private final AtomicBoolean running = new AtomicBoolean();
|
||||
private Configuration config;
|
||||
private String redisKeyName;
|
||||
private String address;
|
||||
private String user;
|
||||
private String password;
|
||||
|
||||
private Jedis client = null;
|
||||
|
||||
void connect() {
|
||||
|
||||
HostAndPort address = HostAndPort.from(this.address);
|
||||
|
||||
client = new Jedis(address);
|
||||
if (user != null) {
|
||||
client.auth(this.user, this.password);
|
||||
}
|
||||
else if (this.password != null) {
|
||||
client.auth(this.password);
|
||||
}
|
||||
else {
|
||||
// make sure that client is connected
|
||||
client.ping();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
|
||||
if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) {
|
||||
throw new ConnectException(
|
||||
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
|
||||
}
|
||||
config.validateAndRecord(ALL_FIELDS, LOGGER::error);
|
||||
this.config = config;
|
||||
// fetch the properties. as a fallback, if we did not specify the redis address, we will take it and all the credentials from the sink
|
||||
this.address = this.config.getString(PROP_ADDRESS.name());
|
||||
if (this.address == null) {
|
||||
// try to take the connection details from the redis sink
|
||||
this.address = this.config.getString(SINK_PROP_PREFIX + "address");
|
||||
this.user = this.config.getString(SINK_PROP_PREFIX + "user");
|
||||
this.password = this.config.getString(SINK_PROP_PREFIX + "password");
|
||||
}
|
||||
else {
|
||||
this.user = this.config.getString(PROP_USER.name());
|
||||
this.password = this.config.getString(PROP_PASSWORD.name());
|
||||
}
|
||||
this.redisKeyName = this.config.getString(PROP_KEY);
|
||||
LOGGER.info("rediskeyname:" + this.redisKeyName);
|
||||
// load retry settings
|
||||
this.initialRetryDelay = this.config.getInteger(PROP_RETRY_INITIAL_DELAY);
|
||||
this.maxRetryDelay = this.config.getInteger(PROP_RETRY_MAX_DELAY);
|
||||
|
||||
super.configure(config, comparator, listener, useCatalogBeforeSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
super.start();
|
||||
LOGGER.info("Starting RedisDatabaseHistory");
|
||||
this.connect();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
|
||||
if (record == null) {
|
||||
return;
|
||||
}
|
||||
String line;
|
||||
try {
|
||||
line = writer.write(record.document());
|
||||
}
|
||||
catch (IOException e) {
|
||||
LOGGER.error("Failed to convert record to string: {}", record, e);
|
||||
throw new DatabaseHistoryException("Unable to write database history record");
|
||||
}
|
||||
|
||||
DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
|
||||
boolean completedSuccessfully = false;
|
||||
|
||||
// loop and retry until successful
|
||||
while (!completedSuccessfully) {
|
||||
try {
|
||||
if (client == null) {
|
||||
this.connect();
|
||||
}
|
||||
|
||||
// write the entry to Redis
|
||||
client.xadd(this.redisKeyName, (StreamEntryID) null, Collections.singletonMap("schema", line));
|
||||
LOGGER.trace("Record written to database history in redis: " + line);
|
||||
completedSuccessfully = true;
|
||||
}
|
||||
catch (JedisConnectionException jce) {
|
||||
LOGGER.warn("Attempting to reconnect to redis ");
|
||||
this.connect();
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.warn("Writing to database history stream failed", e);
|
||||
LOGGER.warn("Will retry");
|
||||
}
|
||||
if (!completedSuccessfully) {
|
||||
// Failed to execute the transaction, retry...
|
||||
delayStrategy.sleepWhen(!completedSuccessfully);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
running.set(false);
|
||||
if (client != null) {
|
||||
client.disconnect();
|
||||
}
|
||||
super.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
|
||||
DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
|
||||
boolean completedSuccessfully = false;
|
||||
List<StreamEntry> entries = new ArrayList<StreamEntry>();
|
||||
|
||||
// loop and retry until successful
|
||||
while (!completedSuccessfully) {
|
||||
try {
|
||||
if (client == null) {
|
||||
this.connect();
|
||||
}
|
||||
|
||||
// read the entries from Redis
|
||||
entries = client.xrange(
|
||||
this.redisKeyName, (StreamEntryID) null, (StreamEntryID) null);
|
||||
completedSuccessfully = true;
|
||||
}
|
||||
catch (JedisConnectionException jce) {
|
||||
LOGGER.warn("Attempting to reconnect to redis ");
|
||||
this.connect();
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.warn("Reading from database history stream failed with " + e);
|
||||
LOGGER.warn("Will retry");
|
||||
}
|
||||
if (!completedSuccessfully) {
|
||||
// Failed to execute the transaction, retry...
|
||||
delayStrategy.sleepWhen(!completedSuccessfully);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for (StreamEntry item : entries) {
|
||||
try {
|
||||
records.accept(new HistoryRecord(reader.read(item.getFields().get("schema"))));
|
||||
}
|
||||
catch (IOException e) {
|
||||
LOGGER.error("Failed to convert record to string: {}", item, e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean storageExists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists() {
|
||||
// check if the stream is not empty
|
||||
if (client != null && client.xlen(this.redisKeyName) > 0) {
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@ -31,6 +31,7 @@
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.StreamEntryID;
|
||||
import redis.clients.jedis.Transaction;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.exceptions.JedisDataException;
|
||||
@ -159,7 +160,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
|
||||
String value = (record.value() != null) ? getString(record.value()) : nullValue;
|
||||
|
||||
// Add the record to the destination stream
|
||||
transaction.xadd(destination, null, Collections.singletonMap(key, value));
|
||||
transaction.xadd(destination, StreamEntryID.NEW_ENTRY, Collections.singletonMap(key, value));
|
||||
}
|
||||
|
||||
// Execute the transaction in Redis
|
||||
|
@ -0,0 +1,148 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.redis;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
import javax.enterprise.event.Observes;
|
||||
|
||||
import org.awaitility.Awaitility;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mysql.MySqlConnection;
|
||||
import io.debezium.connector.mysql.MySqlConnection.MySqlConnectionConfiguration;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.relational.history.AbstractDatabaseHistoryTest;
|
||||
import io.debezium.relational.history.DatabaseHistory;
|
||||
import io.debezium.relational.history.DatabaseHistoryMetrics;
|
||||
import io.debezium.server.TestConfigSource;
|
||||
import io.debezium.server.events.ConnectorStartedEvent;
|
||||
import io.debezium.testing.testcontainers.MySqlTestResourceLifecycleManager;
|
||||
import io.debezium.util.Testing;
|
||||
import io.quarkus.test.common.QuarkusTestResource;
|
||||
import io.quarkus.test.junit.QuarkusIntegrationTest;
|
||||
import io.quarkus.test.junit.TestProfile;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.StreamEntryID;
|
||||
import redis.clients.jedis.resps.StreamEntry;
|
||||
|
||||
/**
|
||||
* Integration test that verifies reading and writing database history from Redis key value store
|
||||
*
|
||||
* @author Oren Elias
|
||||
*/
|
||||
@QuarkusIntegrationTest
|
||||
@TestProfile(RedisDatabaseHistoryTestProfile.class)
|
||||
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
|
||||
public class RedisDatabaseHistoryIT extends AbstractDatabaseHistoryTest {
|
||||
|
||||
void setupDependencies(@Observes ConnectorStartedEvent event) {
|
||||
Testing.Print.enable();
|
||||
}
|
||||
|
||||
private static final String STREAM_NAME = "metadata:debezium:db_history";
|
||||
|
||||
protected static Jedis jedis;
|
||||
|
||||
@Override
|
||||
@BeforeEach
|
||||
public void beforeEach() {
|
||||
super.beforeEach();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DatabaseHistory createHistory() {
|
||||
DatabaseHistory history = new RedisDatabaseHistory();
|
||||
|
||||
history.configure(Configuration.create()
|
||||
.with(RedisDatabaseHistory.PROP_ADDRESS, HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()))
|
||||
.build(), null, DatabaseHistoryMetrics.NOOP, true);
|
||||
history.start();
|
||||
return history;
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-4771")
|
||||
public void testDatabaseHistoryIsSaved() throws Exception {
|
||||
jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
|
||||
Testing.Print.enable();
|
||||
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
|
||||
final long streamLength = jedis.xlen(STREAM_NAME);
|
||||
return streamLength > 0;
|
||||
});
|
||||
|
||||
final List<StreamEntry> entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null);
|
||||
assertTrue(entries.stream().anyMatch(item -> item.getFields().get("schema").contains("CREATE TABLE `customers`")));
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-4771")
|
||||
public void shouldRecordChangesAndRecoverToVariousPoints() {
|
||||
super.shouldRecordChangesAndRecoverToVariousPoints();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test retry mechanism when encountering Redis connectivity issues:
|
||||
* 1. Make Redis unavailable while the server is up
|
||||
* 2. Create a new table named redis_test in MySQL
|
||||
* 3. Bring Redis up again and make sure the database history has been written successfully
|
||||
*/
|
||||
@Test
|
||||
@FixFor("DBZ-4509")
|
||||
public void testRedisConnectionRetry() throws Exception {
|
||||
Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
|
||||
// wait until the db history is written for the first time
|
||||
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
|
||||
final long streamLength = jedis.xlen(STREAM_NAME);
|
||||
return streamLength > 0;
|
||||
});
|
||||
|
||||
// clear the key
|
||||
jedis.del(STREAM_NAME);
|
||||
|
||||
// pause container
|
||||
Testing.print("Pausing container");
|
||||
RedisTestResourceLifecycleManager.pause();
|
||||
|
||||
final MySqlConnection connection = getMySqlConnection();
|
||||
connection.connect();
|
||||
Testing.print("Creating new redis_test table and inserting 5 records to it");
|
||||
connection.execute("CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)");
|
||||
connection.close();
|
||||
|
||||
Testing.print("Sleeping for 2 seconds to flush records");
|
||||
Thread.sleep(2000);
|
||||
Testing.print("Unpausing container");
|
||||
RedisTestResourceLifecycleManager.unpause();
|
||||
|
||||
// wait until the db history is written for the first time
|
||||
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
|
||||
final long streamLength = jedis.xlen(STREAM_NAME);
|
||||
return streamLength > 0;
|
||||
});
|
||||
final List<StreamEntry> entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null);
|
||||
Assertions.assertThat(entries.size() == 1).isTrue();
|
||||
Assertions.assertThat(entries.get(0).getFields().get("schema")).contains("redis_test");
|
||||
}
|
||||
|
||||
private MySqlConnection getMySqlConnection() {
|
||||
return new MySqlConnection(new MySqlConnectionConfiguration(Configuration.create()
|
||||
.with("database.user", MySqlTestResourceLifecycleManager.PRIVILEGED_USER)
|
||||
.with("database.password", MySqlTestResourceLifecycleManager.PRIVILEGED_PASSWORD)
|
||||
.with("database.dbname", MySqlTestResourceLifecycleManager.DBNAME)
|
||||
.with("database.hostname", MySqlTestResourceLifecycleManager.HOST)
|
||||
.with("database.port", MySqlTestResourceLifecycleManager.getContainer().getMappedPort(MySqlTestResourceLifecycleManager.PORT))
|
||||
.build()));
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.server.redis;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.debezium.testing.testcontainers.MySqlTestResourceLifecycleManager;
|
||||
import io.debezium.util.Testing;
|
||||
import io.quarkus.test.junit.QuarkusTestProfile;
|
||||
|
||||
public class RedisDatabaseHistoryTestProfile implements QuarkusTestProfile {
|
||||
public static final String OFFSETS_FILE = "file-connector-offsets.txt";
|
||||
public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath();
|
||||
public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
|
||||
|
||||
@Override
|
||||
public List<TestResourceEntry> testResources() {
|
||||
return Arrays.asList(new TestResourceEntry(MySqlTestResourceLifecycleManager.class));
|
||||
}
|
||||
|
||||
public Map<String, String> getConfigOverrides() {
|
||||
Map<String, String> config = new HashMap<String, String>();
|
||||
config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
|
||||
config.put("debezium.source.database.history", "io.debezium.server.redis.RedisDatabaseHistory");
|
||||
config.put("debezium.source.database.history.redis.address", "${debezium.sink.redis.address}");
|
||||
return config;
|
||||
}
|
||||
|
||||
}
|
@ -26,7 +26,8 @@
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.StreamEntry;
|
||||
import redis.clients.jedis.StreamEntryID;
|
||||
import redis.clients.jedis.resps.StreamEntry;
|
||||
|
||||
/**
|
||||
* Integration test that verifies reading and writing offsets from Redis key value store
|
||||
@ -35,7 +36,6 @@
|
||||
*/
|
||||
@QuarkusIntegrationTest
|
||||
@TestProfile(RedisOffsetTestProfile.class)
|
||||
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
|
||||
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
|
||||
|
||||
public class RedisOffsetIT {
|
||||
@ -62,7 +62,7 @@ public void testRedisStream() throws Exception {
|
||||
Testing.Print.enable();
|
||||
final List<StreamEntry> entries = new ArrayList<>();
|
||||
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
|
||||
final List<StreamEntry> response = jedis.xrange(STREAM_NAME, null, null, MESSAGE_COUNT);
|
||||
final List<StreamEntry> response = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null, MESSAGE_COUNT);
|
||||
entries.addAll(response);
|
||||
return entries.size() >= MESSAGE_COUNT;
|
||||
});
|
||||
|
@ -5,16 +5,25 @@
|
||||
*/
|
||||
package io.debezium.server.redis;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
|
||||
import io.quarkus.test.junit.QuarkusTestProfile;
|
||||
|
||||
public class RedisOffsetTestProfile implements QuarkusTestProfile {
|
||||
|
||||
@Override
|
||||
public List<TestResourceEntry> testResources() {
|
||||
return Arrays.asList(new TestResourceEntry(PostgresTestResourceLifecycleManager.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getConfigOverrides() {
|
||||
Map<String, String> config = new HashMap<String, String>();
|
||||
config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
|
||||
config.put("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore");
|
||||
return config;
|
||||
}
|
||||
|
@ -33,7 +33,6 @@
|
||||
*/
|
||||
@QuarkusIntegrationTest
|
||||
@TestProfile(RedisStreamTestProfile.class)
|
||||
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
|
||||
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
|
||||
public class RedisStreamIT {
|
||||
|
||||
|
@ -6,9 +6,12 @@
|
||||
package io.debezium.server.redis;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
|
||||
import io.debezium.util.Testing;
|
||||
import io.quarkus.test.junit.QuarkusTestProfile;
|
||||
|
||||
@ -18,8 +21,15 @@ public class RedisStreamTestProfile implements QuarkusTestProfile {
|
||||
public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath();
|
||||
public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
|
||||
|
||||
@Override
|
||||
public List<TestResourceEntry> testResources() {
|
||||
return Arrays.asList(new TestResourceEntry(PostgresTestResourceLifecycleManager.class));
|
||||
}
|
||||
|
||||
public Map<String, String> getConfigOverrides() {
|
||||
Map<String, String> config = new HashMap<String, String>();
|
||||
config.put("debezium.source.connector.bla", "hello");
|
||||
config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
|
||||
config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
|
||||
return config;
|
||||
}
|
||||
|
@ -118,6 +118,11 @@
|
||||
<artifactId>debezium-connector-postgres</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-connector-mysql</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Aligning versions/fixing scopes -->
|
||||
<dependency>
|
||||
|
@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.testing.testcontainers;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
|
||||
|
||||
/**
|
||||
* @author Oren Elias
|
||||
*/
|
||||
public class MySqlTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
|
||||
|
||||
public static final String USER = "debezium";
|
||||
public static final String PASSWORD = "dbz";
|
||||
public static final String PRIVILEGED_USER = "mysqluser";
|
||||
public static final String PRIVILEGED_PASSWORD = "mysqlpassword";
|
||||
public static final String ROOT_PASSWORD = "debezium";
|
||||
public static final String DBNAME = "inventory";
|
||||
public static final String IMAGE = "debezium/example-mysql";
|
||||
public static final String HOST = "localhost";
|
||||
public static final Integer PORT = 3306;
|
||||
|
||||
private static final GenericContainer<?> container = new GenericContainer<>(IMAGE)
|
||||
.waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2))
|
||||
.withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD)
|
||||
.withEnv("MYSQL_USER", PRIVILEGED_USER)
|
||||
.withEnv("MYSQL_PASSWORD", PRIVILEGED_PASSWORD)
|
||||
.withExposedPorts(PORT)
|
||||
.withStartupTimeout(Duration.ofSeconds(180));
|
||||
|
||||
public static GenericContainer<?> getContainer() {
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> start() {
|
||||
container.start();
|
||||
|
||||
Map<String, String> params = new ConcurrentHashMap<>();
|
||||
params.put("debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
|
||||
params.put("debezium.source.database.hostname", HOST);
|
||||
params.put("debezium.source.database.port", container.getMappedPort(PORT).toString());
|
||||
params.put("debezium.source.database.user", USER);
|
||||
params.put("debezium.source.database.password", PASSWORD);
|
||||
params.put("debezium.source.database.dbname", DBNAME);
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
try {
|
||||
if (container != null) {
|
||||
container.stop();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
}
|
@ -223,11 +223,36 @@ There are also other options available
|
||||
|
||||
* `io.debezium.relational.history.FileDatabaseHistory` for non-Kafka deployments
|
||||
* `io.debezium.relational.history.MemoryDatabaseHistory` volatile store for test environments
|
||||
* `io.debezium.server.redis.RedisDatabaseHistory` volatile store for test environments
|
||||
|
||||
|[[debezium-source-database-history-file-filename]]<<debezium-source-database-history-file-filename, `debezium.source.database.history.file.filename`>>
|
||||
|
|
||||
|The name and location of the file to which `FileDatabaseHistory` persists its data.
|
||||
|
||||
|[[debezium-source-database-history-redis-address]]<<debezium-source-database-history-redis-address, `debezium.source.database.history.redis.address`>>
|
||||
|
|
||||
|The Redis host:port to connect to if using `RedisDatabaseHistory`.
|
||||
|
||||
|[[debezium-source-database-history-redis-user]]<<debezium-source-database-history-redis-user, `debezium.source.database.history.redis.user`>>
|
||||
|
|
||||
|The Redis user to use if using `RedisDatabaseHistory`.
|
||||
|
||||
|[[debezium-source-database-history-redis-password]]<<debezium-source-database-history-redis-password, `debezium.source.database.history.redis.password`>>
|
||||
|
|
||||
|The Redis password to use if using `RedisDatabaseHistory`.
|
||||
|
||||
|[[debezium-source-database-history-redis-key]]<<debezium-source-database-history-redis-key, `debezium.source.database.history.redis.key`>>
|
||||
|
|
||||
|The Redis key to use for storage if using `RedisDatabaseHistory`. Default: metadata:debezium:db_history
|
||||
|
||||
|[[debezium-source-database-history-retry-initial-delay-ms]]<<debezium-source-database-history-retry-initial-delay-ms, `debezium.source.database.history.retry.initial.delay.ms`>>
|
||||
|
|
||||
|The initial delay in case of a connection retry to Redis if using `RedisDatabaseHistory`. Default: 300 (ms)
|
||||
|
||||
|[[debezium-source-database-history-retry-max-delay-ms]]<<debezium-source-database-history-retry-max-delay-ms, `debezium.source.database.history.retry.max.delay.ms`>>
|
||||
|
|
||||
|The maximum delay in case of a connection retry to Redis if using `RedisDatabaseHistory`. Default: 10000 (ms)
|
||||
|
||||
|===
|
||||
|
||||
[id="debezium-format-configuration-options"]
|
||||
@ -696,6 +721,8 @@ By default the same name is used.
|
||||
|
||||
|===
|
||||
|
||||
|
||||
|
||||
==== NATS Streaming
|
||||
|
||||
https://docs.nats.io/nats-streaming-concepts/intro[NATS Streaming] is a data streaming system powered by NATS, and written in the Go programming language.
|
||||
|
Loading…
Reference in New Issue
Block a user