DBZ-6566 Support Blocking snapshot for MySQL
This commit is contained in:
parent
6e0b323e83
commit
def7379a0d
@ -105,6 +105,12 @@
|
|||||||
<groupId>io.apicurio</groupId>
|
<groupId>io.apicurio</groupId>
|
||||||
<artifactId>apicurio-registry-utils-converter</artifactId>
|
<artifactId>apicurio-registry-utils-converter</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-jboss-logmanager</artifactId>
|
||||||
|
<groupId>org.jboss.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.awaitility</groupId>
|
<groupId>org.awaitility</groupId>
|
||||||
|
@ -63,7 +63,11 @@ public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MainCon
|
|||||||
public SnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<MySqlPartition> snapshotProgressListener,
|
public SnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<MySqlPartition> snapshotProgressListener,
|
||||||
NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
|
NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
|
||||||
return new MySqlSnapshotChangeEventSource(configuration, connectionFactory, taskContext.getSchema(), dispatcher, clock,
|
return new MySqlSnapshotChangeEventSource(configuration, connectionFactory, taskContext.getSchema(), dispatcher, clock,
|
||||||
(MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, this::modifyAndFlushLastRecord, notificationService);
|
(MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, this::modifyAndFlushLastRecord, this::preSnapshot, notificationService);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preSnapshot() {
|
||||||
|
queue.enableBuffering();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void modifyAndFlushLastRecord(Function<SourceRecord, SourceRecord> modify) throws InterruptedException {
|
private void modifyAndFlushLastRecord(Function<SourceRecord, SourceRecord> modify) throws InterruptedException {
|
||||||
|
@ -67,11 +67,13 @@ public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEven
|
|||||||
private final List<SchemaChangeEvent> schemaEvents = new ArrayList<>();
|
private final List<SchemaChangeEvent> schemaEvents = new ArrayList<>();
|
||||||
private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
|
private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
|
||||||
private final BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor;
|
private final BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor;
|
||||||
|
private final Runnable preSnapshotAction;
|
||||||
|
|
||||||
public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory<MySqlConnection> connectionFactory,
|
public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory<MySqlConnection> connectionFactory,
|
||||||
MySqlDatabaseSchema schema, EventDispatcher<MySqlPartition, TableId> dispatcher, Clock clock,
|
MySqlDatabaseSchema schema, EventDispatcher<MySqlPartition, TableId> dispatcher, Clock clock,
|
||||||
MySqlSnapshotChangeEventSourceMetrics metrics,
|
MySqlSnapshotChangeEventSourceMetrics metrics,
|
||||||
BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor,
|
BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor,
|
||||||
|
Runnable preSnapshotAction,
|
||||||
NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
|
NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
|
||||||
super(connectorConfig, connectionFactory, schema, dispatcher, clock, metrics, notificationService);
|
super(connectorConfig, connectionFactory, schema, dispatcher, clock, metrics, notificationService);
|
||||||
this.connectorConfig = connectorConfig;
|
this.connectorConfig = connectorConfig;
|
||||||
@ -80,6 +82,7 @@ public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, Main
|
|||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.databaseSchema = schema;
|
this.databaseSchema = schema;
|
||||||
this.lastEventProcessor = lastEventProcessor;
|
this.lastEventProcessor = lastEventProcessor;
|
||||||
|
this.preSnapshotAction = preSnapshotAction;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -88,7 +91,7 @@ protected SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOf
|
|||||||
boolean snapshotData = true;
|
boolean snapshotData = true;
|
||||||
|
|
||||||
// found a previous offset and the earlier snapshot has completed
|
// found a previous offset and the earlier snapshot has completed
|
||||||
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
|
if (previousOffset != null && !previousOffset.isSnapshotRunning() && false /* TODO check if streaming is pause */) {
|
||||||
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
|
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
|
||||||
snapshotSchema = databaseSchema.isStorageInitializationExecuted();
|
snapshotSchema = databaseSchema.isStorageInitializationExecuted();
|
||||||
snapshotData = false;
|
snapshotData = false;
|
||||||
@ -652,6 +655,12 @@ protected void postSnapshot() throws InterruptedException {
|
|||||||
super.postSnapshot();
|
super.postSnapshot();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void preSnapshot() throws InterruptedException {
|
||||||
|
preSnapshotAction.run();
|
||||||
|
super.preSnapshot();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MySqlOffsetContext copyOffset(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) {
|
protected MySqlOffsetContext copyOffset(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext) {
|
||||||
return new Loader(connectorConfig).load(snapshotContext.offset.getOffset());
|
return new Loader(connectorConfig).load(snapshotContext.offset.getOffset());
|
||||||
|
@ -1038,6 +1038,12 @@ public void execute(ChangeEventSourceContext context, MySqlPartition partition,
|
|||||||
}
|
}
|
||||||
while (context.isRunning()) {
|
while (context.isRunning()) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
if (context.isPaused()) {
|
||||||
|
LOGGER.info("Streaming will now pause");
|
||||||
|
context.streamingPaused();
|
||||||
|
context.waitSnapshotCompletion();
|
||||||
|
LOGGER.info("Streaming resumed");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -0,0 +1,149 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.debezium.connector.mysql;
|
||||||
|
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import io.debezium.config.CommonConnectorConfig;
|
||||||
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
|
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
|
||||||
|
import io.debezium.relational.TableId;
|
||||||
|
import io.debezium.relational.history.SchemaHistory;
|
||||||
|
import io.debezium.util.Testing;
|
||||||
|
|
||||||
|
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
|
||||||
|
|
||||||
|
protected static final String SERVER_NAME = "is_test";
|
||||||
|
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws SQLException {
|
||||||
|
stopConnector();
|
||||||
|
DATABASE.createAndInitialize();
|
||||||
|
initializeConnectorTestFramework();
|
||||||
|
Testing.Files.delete(SCHEMA_HISTORY_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() {
|
||||||
|
try {
|
||||||
|
stopConnector();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
Testing.Files.delete(SCHEMA_HISTORY_PATH);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Configuration.Builder config() {
|
||||||
|
return DATABASE.defaultConfig()
|
||||||
|
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
|
||||||
|
.with(MySqlConnectorConfig.USER, "mysqluser")
|
||||||
|
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
|
||||||
|
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue())
|
||||||
|
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
|
||||||
|
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
|
||||||
|
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
|
||||||
|
final String tableIncludeList;
|
||||||
|
if (signalTableOnly) {
|
||||||
|
tableIncludeList = DATABASE.qualifiedTableName("c");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
tableIncludeList = DATABASE.qualifiedTableName("a") + ", " + DATABASE.qualifiedTableName("c");
|
||||||
|
}
|
||||||
|
return DATABASE.defaultConfig()
|
||||||
|
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
|
||||||
|
.with(MySqlConnectorConfig.USER, "mysqluser")
|
||||||
|
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
|
||||||
|
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue())
|
||||||
|
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
|
||||||
|
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
|
||||||
|
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
|
||||||
|
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
|
||||||
|
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl)
|
||||||
|
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String connector() {
|
||||||
|
return "mysql";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String server() {
|
||||||
|
return DATABASE.getServerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Class<MySqlConnector> connectorClass() {
|
||||||
|
return MySqlConnector.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected JdbcConnection databaseConnection() {
|
||||||
|
return MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String topicName() {
|
||||||
|
return DATABASE.topicForTable("a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<String> topicNames() {
|
||||||
|
return List.of(DATABASE.topicForTable("a"), DATABASE.topicForTable("c"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String tableName() {
|
||||||
|
return tableNameId().toQuotedString('`');
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<String> tableNames() {
|
||||||
|
final String tableA = TableId.parse(DATABASE.qualifiedTableName("a")).toQuotedString('`');
|
||||||
|
final String tableB = TableId.parse(DATABASE.qualifiedTableName("c")).toQuotedString('`');
|
||||||
|
return List.of(tableA, tableB);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String signalTableName() {
|
||||||
|
return tableNameId("debezium_signal").toQuotedString('`');
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String signalTableNameSanitized() {
|
||||||
|
return DATABASE.qualifiedTableName("debezium_signal");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String tableDataCollectionId() {
|
||||||
|
return tableNameId().toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<String> tableDataCollectionIds() {
|
||||||
|
return List.of(tableNameId().toString(), tableNameId("c").toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private TableId tableNameId() {
|
||||||
|
return tableNameId("a");
|
||||||
|
}
|
||||||
|
|
||||||
|
private TableId tableNameId(String table) {
|
||||||
|
return TableId.parse(DATABASE.qualifiedTableName(table));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,16 @@
|
|||||||
|
-- ----------------------------------------------------------------------------------------------------------------
|
||||||
|
-- DATABASE: incremental_snapshot_test
|
||||||
|
-- ----------------------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
CREATE TABLE a (
|
||||||
|
pk INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||||
|
aa INTEGER
|
||||||
|
) AUTO_INCREMENT = 1;
|
||||||
|
|
||||||
|
CREATE TABLE debezium_signal (
|
||||||
|
id varchar(64),
|
||||||
|
type varchar(32),
|
||||||
|
data varchar(2048)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE DATABASE IF NOT EXISTS emptydb;
|
@ -26,4 +26,17 @@
|
|||||||
level="error" additivity="false">
|
level="error" additivity="false">
|
||||||
<appender-ref ref="CONSOLE" />
|
<appender-ref ref="CONSOLE" />
|
||||||
</logger>
|
</logger>
|
||||||
|
|
||||||
|
<!-- For debug purpose -->
|
||||||
|
<logger
|
||||||
|
name="io.debezium.pipeline.ChangeEventSourceCoordinator"
|
||||||
|
level="off" additivity="false">
|
||||||
|
<appender-ref ref="CONSOLE" />
|
||||||
|
</logger>
|
||||||
|
|
||||||
|
<logger
|
||||||
|
name="io.debezium.pipeline.EventDispatcher"
|
||||||
|
level="off" additivity="false">
|
||||||
|
<appender-ref ref="CONSOLE" />
|
||||||
|
</logger>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -138,6 +138,12 @@
|
|||||||
<groupId>io.apicurio</groupId>
|
<groupId>io.apicurio</groupId>
|
||||||
<artifactId>apicurio-registry-utils-converter</artifactId>
|
<artifactId>apicurio-registry-utils-converter</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<artifactId>slf4j-jboss-logmanager</artifactId>
|
||||||
|
<groupId>org.jboss.slf4j</groupId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- Used for unit testing with Kafka -->
|
<!-- Used for unit testing with Kafka -->
|
||||||
|
@ -244,7 +244,6 @@ private void processMessages(ChangeEventSourceContext context, PostgresPartition
|
|||||||
connection.commit();
|
connection.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOGGER.info("Checking stream paused");
|
|
||||||
if (context.isPaused()) {
|
if (context.isPaused()) {
|
||||||
LOGGER.info("Streaming will now pause");
|
LOGGER.info("Streaming will now pause");
|
||||||
context.streamingPaused();
|
context.streamingPaused();
|
||||||
|
@ -6,28 +6,20 @@
|
|||||||
|
|
||||||
package io.debezium.connector.postgresql;
|
package io.debezium.connector.postgresql;
|
||||||
|
|
||||||
import static io.debezium.pipeline.signal.actions.AbstractSnapshotSignal.SnapshotType.INITIAL_BLOCKING;
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
|
||||||
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
import org.apache.kafka.connect.data.Struct;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import io.debezium.config.CommonConnectorConfig;
|
import io.debezium.config.CommonConnectorConfig;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
|
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
|
||||||
import io.debezium.jdbc.JdbcConnection;
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
|
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
|
||||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||||
|
|
||||||
public class BlockingSnapshotIT extends AbstractIncrementalSnapshotTest<PostgresConnector> {
|
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
|
||||||
|
|
||||||
private static final String TOPIC_NAME = "test_server.s1.a";
|
private static final String TOPIC_NAME = "test_server.s1.a";
|
||||||
|
|
||||||
@ -64,9 +56,7 @@ protected Configuration.Builder config() {
|
|||||||
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
|
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
|
||||||
.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source")
|
.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source")
|
||||||
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
|
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
|
||||||
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4")
|
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4");
|
||||||
// DBZ-4272 required to allow dropping columns just before an incremental snapshot
|
|
||||||
.with("database.autosave", "conservative");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -79,9 +69,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
|
|||||||
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
|
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
|
||||||
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
|
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
|
||||||
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
|
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
|
||||||
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a")
|
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a");
|
||||||
// DBZ-4272 required to allow dropping columns just before an incremental snapshot
|
|
||||||
.with("database.autosave", "conservative");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -119,12 +107,6 @@ protected String signalTableName() {
|
|||||||
return "s1.debezium_signal";
|
return "s1.debezium_signal";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void waitForConnectorToStart() {
|
|
||||||
super.waitForConnectorToStart();
|
|
||||||
TestHelper.waitForDefaultReplicationSlotBeActive();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String connector() {
|
protected String connector() {
|
||||||
return "postgres";
|
return "postgres";
|
||||||
@ -135,63 +117,4 @@ protected String server() {
|
|||||||
return TestHelper.TEST_SERVER;
|
return TestHelper.TEST_SERVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void executeBlockingSnapshot() throws Exception {
|
|
||||||
// Testing.Print.enable();
|
|
||||||
|
|
||||||
populateTable();
|
|
||||||
|
|
||||||
startConnectorWithSnapshot(x -> mutableConfig(false, false));
|
|
||||||
|
|
||||||
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
|
|
||||||
|
|
||||||
try (JdbcConnection connection = databaseConnection()) {
|
|
||||||
connection.setAutoCommit(false);
|
|
||||||
for (int i = 0; i < ROW_COUNT; i++) {
|
|
||||||
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
|
||||||
tableName(),
|
|
||||||
connection.quotedColumnIdString(pkFieldName()),
|
|
||||||
i + ROW_COUNT + 1,
|
|
||||||
i + ROW_COUNT));
|
|
||||||
}
|
|
||||||
connection.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(ROW_COUNT * 2);
|
|
||||||
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(ROW_COUNT * 2);
|
|
||||||
List<Integer> actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream()
|
|
||||||
.map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa"))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
assertThat(actual).containsAll(IntStream.range(0, 1999).boxed().collect(Collectors.toList()));
|
|
||||||
|
|
||||||
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), INITIAL_BLOCKING, tableDataCollectionId());
|
|
||||||
|
|
||||||
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
|
|
||||||
|
|
||||||
snapshotAndStreamingRecords = consumeRecordsByTopic((ROW_COUNT * 2) + 1);
|
|
||||||
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo((ROW_COUNT * 2) + 1);
|
|
||||||
actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream()
|
|
||||||
.map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa"))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
assertThat(actual).containsAll(IntStream.range(0, 1999).boxed().collect(Collectors.toList()));
|
|
||||||
|
|
||||||
try (JdbcConnection connection = databaseConnection()) {
|
|
||||||
connection.setAutoCommit(false);
|
|
||||||
for (int i = 0; i < ROW_COUNT; i++) {
|
|
||||||
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
|
||||||
tableName(),
|
|
||||||
connection.quotedColumnIdString(pkFieldName()),
|
|
||||||
i + (ROW_COUNT * 2) + 1,
|
|
||||||
i + (ROW_COUNT * 2)));
|
|
||||||
}
|
|
||||||
connection.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshotAndStreamingRecords = consumeRecordsByTopic(ROW_COUNT + 1);
|
|
||||||
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(ROW_COUNT + 1);
|
|
||||||
actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream()
|
|
||||||
.map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa"))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
assertThat(actual).containsAll(IntStream.range(2000, 2999).boxed().collect(Collectors.toList()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ record = bufferedEvent.getAndSet(record);
|
|||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public void flushBuffer(Function<T, T> recordModifier) throws InterruptedException {
|
public void flushBuffer(Function<T, T> recordModifier) throws InterruptedException {
|
||||||
assert buffering : "Unsuported for queues with disabled buffering";
|
assert buffering : "Unsupported for queues with disabled buffering";
|
||||||
T record = bufferedEvent.getAndSet(null);
|
T record = bufferedEvent.getAndSet(null);
|
||||||
if (record != null) {
|
if (record != null) {
|
||||||
doEnqueue(recordModifier.apply(record));
|
doEnqueue(recordModifier.apply(record));
|
||||||
@ -202,6 +202,13 @@ public void disableBuffering() {
|
|||||||
buffering = false;
|
buffering = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable buffering for the queue
|
||||||
|
*/
|
||||||
|
public void enableBuffering() {
|
||||||
|
buffering = true;
|
||||||
|
}
|
||||||
|
|
||||||
protected void doEnqueue(T record) throws InterruptedException {
|
protected void doEnqueue(T record) throws InterruptedException {
|
||||||
if (LOGGER.isTraceEnabled()) {
|
if (LOGGER.isTraceEnabled()) {
|
||||||
LOGGER.trace("Enqueuing source record '{}'", record);
|
LOGGER.trace("Enqueuing source record '{}'", record);
|
||||||
|
@ -11,7 +11,6 @@
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
@ -260,7 +259,6 @@ protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContex
|
|||||||
protected void streamEvents(ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException {
|
protected void streamEvents(ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException {
|
||||||
initStreamEvents(partition, offsetContext);
|
initStreamEvents(partition, offsetContext);
|
||||||
LOGGER.info("Starting streaming");
|
LOGGER.info("Starting streaming");
|
||||||
// Maybe add a pause and restart method that should be called from the action through the coordinator
|
|
||||||
streamingSource.execute(context, partition, offsetContext);
|
streamingSource.execute(context, partition, offsetContext);
|
||||||
LOGGER.info("Finished streaming");
|
LOGGER.info("Finished streaming");
|
||||||
}
|
}
|
||||||
|
@ -214,6 +214,9 @@ public void changeRecord(P partition,
|
|||||||
OffsetContext offset,
|
OffsetContext offset,
|
||||||
ConnectHeaders headers)
|
ConnectHeaders headers)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
|
|
||||||
|
LOGGER.trace("Received change record {} for {} operation on key {} with context {}", value, operation, key, offset);
|
||||||
|
|
||||||
eventListener.onEvent(partition, dataCollectionSchema.id(), offset, key, value, operation);
|
eventListener.onEvent(partition, dataCollectionSchema.id(), offset, key, value, operation);
|
||||||
receiver.changeRecord(partition, dataCollectionSchema, operation, key, value, offset, headers);
|
receiver.changeRecord(partition, dataCollectionSchema, operation, key, value, offset, headers);
|
||||||
}
|
}
|
||||||
@ -268,6 +271,9 @@ public void changeRecord(P partition,
|
|||||||
OffsetContext offset,
|
OffsetContext offset,
|
||||||
ConnectHeaders headers)
|
ConnectHeaders headers)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
|
|
||||||
|
LOGGER.trace("Received change record {} for {} operation on key {} with context {}", value, operation, key, offset);
|
||||||
|
|
||||||
if (operation == Operation.CREATE && connectorConfig.isSignalDataCollection(dataCollectionId) && sourceSignalChannel != null) {
|
if (operation == Operation.CREATE && connectorConfig.isSignalDataCollection(dataCollectionId) && sourceSignalChannel != null) {
|
||||||
sourceSignalChannel.process(value);
|
sourceSignalChannel.process(value);
|
||||||
|
|
||||||
|
@ -106,8 +106,11 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
|
|||||||
final RelationalSnapshotContext<P, O> ctx = (RelationalSnapshotContext<P, O>) snapshotContext;
|
final RelationalSnapshotContext<P, O> ctx = (RelationalSnapshotContext<P, O>) snapshotContext;
|
||||||
|
|
||||||
Connection connection = null;
|
Connection connection = null;
|
||||||
Exception exceptionWhileSnapshot = null;
|
Throwable exceptionWhileSnapshot = null;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
preSnapshot();
|
||||||
|
|
||||||
LOGGER.info("Snapshot step 1 - Preparing");
|
LOGGER.info("Snapshot step 1 - Preparing");
|
||||||
|
|
||||||
if (previousOffset != null && previousOffset.isSnapshotRunning()) {
|
if (previousOffset != null && previousOffset.isSnapshotRunning()) {
|
||||||
@ -165,7 +168,7 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
|
|||||||
dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset);
|
dispatcher.alwaysDispatchHeartbeatEvent(ctx.partition, ctx.offset);
|
||||||
return SnapshotResult.completed(ctx.offset);
|
return SnapshotResult.completed(ctx.offset);
|
||||||
}
|
}
|
||||||
catch (final Exception e) {
|
catch (final Throwable e) {
|
||||||
LOGGER.error("Error during snapshot", e);
|
LOGGER.error("Error during snapshot", e);
|
||||||
exceptionWhileSnapshot = e;
|
exceptionWhileSnapshot = e;
|
||||||
throw e;
|
throw e;
|
||||||
@ -749,4 +752,8 @@ protected Clock getClock() {
|
|||||||
|
|
||||||
protected void postSnapshot() throws InterruptedException {
|
protected void postSnapshot() throws InterruptedException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void preSnapshot() throws InterruptedException {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
|
|
||||||
import static org.slf4j.Logger.ROOT_LOGGER_NAME;
|
import static org.slf4j.Logger.ROOT_LOGGER_NAME;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -95,6 +96,16 @@ public boolean containsMessage(String text) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<ILoggingEvent> getLoggingEvents(String text) {
|
||||||
|
List<ILoggingEvent> matchEvents = new ArrayList<>();
|
||||||
|
for (ILoggingEvent event : events) {
|
||||||
|
if (event.getFormattedMessage().toString().contains(text)) {
|
||||||
|
matchEvents.add(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return matchEvents;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean containsWarnMessage(String text) {
|
public boolean containsWarnMessage(String text) {
|
||||||
return containsMessage(Level.WARN, text);
|
return containsMessage(Level.WARN, text);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,240 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.pipeline;
|
||||||
|
|
||||||
|
import static io.debezium.pipeline.signal.actions.AbstractSnapshotSignal.SnapshotType.INITIAL_BLOCKING;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import javax.management.AttributeNotFoundException;
|
||||||
|
import javax.management.InstanceNotFoundException;
|
||||||
|
import javax.management.MBeanException;
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.MalformedObjectNameException;
|
||||||
|
import javax.management.ReflectionException;
|
||||||
|
import javax.management.openmbean.CompositeDataSupport;
|
||||||
|
import javax.management.openmbean.TabularDataSupport;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Struct;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
|
import io.debezium.junit.logging.LogInterceptor;
|
||||||
|
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
|
||||||
|
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
|
||||||
|
import io.debezium.util.Testing;
|
||||||
|
|
||||||
|
public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest {
|
||||||
|
private int signalingRecords;
|
||||||
|
|
||||||
|
protected static final int ROW_COUNT = 1000;
|
||||||
|
|
||||||
|
protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl);
|
||||||
|
|
||||||
|
protected abstract JdbcConnection databaseConnection();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected abstract String topicName();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected abstract String tableName();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected abstract String connector();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected abstract String server();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void executeBlockingSnapshot() throws Exception {
|
||||||
|
// Testing.Print.enable();
|
||||||
|
|
||||||
|
populateTable();
|
||||||
|
|
||||||
|
startConnectorWithSnapshot(x -> mutableConfig(false, false));
|
||||||
|
|
||||||
|
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
|
||||||
|
|
||||||
|
insertRecords(ROW_COUNT, ROW_COUNT);
|
||||||
|
|
||||||
|
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2);
|
||||||
|
|
||||||
|
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), INITIAL_BLOCKING, tableDataCollectionId());
|
||||||
|
|
||||||
|
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
|
||||||
|
|
||||||
|
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2);
|
||||||
|
|
||||||
|
insertRecords(ROW_COUNT, (ROW_COUNT * 2));
|
||||||
|
|
||||||
|
signalingRecords = 1;
|
||||||
|
|
||||||
|
assertStreamingRecordsArePresent(ROW_COUNT + signalingRecords);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void executeBlockingSnapshotWhileStreaming() throws Exception {
|
||||||
|
// Testing.Debug.enable();
|
||||||
|
|
||||||
|
populateTable();
|
||||||
|
|
||||||
|
startConnectorWithSnapshot(x -> mutableConfig(false, false));
|
||||||
|
|
||||||
|
waitForSnapshotToBeCompleted(connector(), server(), task(), database());
|
||||||
|
|
||||||
|
Future<?> batchInserts = executeAsync(insertTask());
|
||||||
|
|
||||||
|
Thread.sleep(2000); // Let's start stream some insert
|
||||||
|
|
||||||
|
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), INITIAL_BLOCKING, tableDataCollectionId());
|
||||||
|
|
||||||
|
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
|
||||||
|
|
||||||
|
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
|
||||||
|
|
||||||
|
Long totalSnapshotRecords = getTotalSnapshotRecords(tableName(), connector(), server(), task(), database());
|
||||||
|
|
||||||
|
batchInserts.get(120, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
insertRecords(ROW_COUNT, (ROW_COUNT * 2));
|
||||||
|
|
||||||
|
signalingRecords = 1 + // from streaming
|
||||||
|
1; // from snapshot
|
||||||
|
|
||||||
|
assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords),
|
||||||
|
AbstractBlockingSnapshotTest.getExpectedValues(totalSnapshotRecords));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Runnable insertTask() {
|
||||||
|
return () -> {
|
||||||
|
try {
|
||||||
|
insertRecordsWithRandomSleep(ROW_COUNT, ROW_COUNT, 2);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long getTotalSnapshotRecords(String table, String connector, String server, String task, String database) throws MalformedObjectNameException,
|
||||||
|
ReflectionException, AttributeNotFoundException, InstanceNotFoundException,
|
||||||
|
MBeanException {
|
||||||
|
|
||||||
|
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
|
||||||
|
TabularDataSupport rowsScanned = (TabularDataSupport) mbeanServer.getAttribute(getSnapshotMetricsObjectName(connector, server, task, database),
|
||||||
|
"RowsScanned");
|
||||||
|
|
||||||
|
Map<String, Object> scannedRowsByTable = rowsScanned.values().stream().map(c -> ((CompositeDataSupport) c))
|
||||||
|
.collect(Collectors.toMap(compositeDataSupport -> compositeDataSupport.get("key").toString(), compositeDataSupport -> compositeDataSupport.get("value")));
|
||||||
|
|
||||||
|
String unquotedTableName = table.replace("`", "");
|
||||||
|
return (Long) scannedRowsByTable.get(unquotedTableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Integer> getExpectedValues(Long totalSnapshotRecords) {
|
||||||
|
|
||||||
|
List<Integer> initialSnapShotValues = IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList());
|
||||||
|
List<Integer> firstStreamingBatchValues = IntStream.rangeClosed(1000, 1999).boxed().collect(Collectors.toList());
|
||||||
|
List<Integer> blockingSnapshotValues = Stream.of(
|
||||||
|
initialSnapShotValues,
|
||||||
|
IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(List::stream)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
List<Integer> secondStreamingBatchValues = IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList());
|
||||||
|
return Stream.of(initialSnapShotValues, firstStreamingBatchValues, blockingSnapshotValues, secondStreamingBatchValues).flatMap(List::stream)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void waitForLogMessage(String message, Class<?> logEmitterClass) {
|
||||||
|
LogInterceptor interceptor = new LogInterceptor(logEmitterClass);
|
||||||
|
Awaitility.await()
|
||||||
|
.alias("Snapshot not completed on time")
|
||||||
|
.pollInterval(100, TimeUnit.MILLISECONDS)
|
||||||
|
.atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS)
|
||||||
|
.until(() -> interceptor.containsMessage(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Future<?> executeAsync(Runnable operation) {
|
||||||
|
return Executors.newSingleThreadExecutor().submit(operation);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertStreamingRecordsArePresent(int expectedRecords) throws InterruptedException {
|
||||||
|
|
||||||
|
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords) throws InterruptedException {
|
||||||
|
|
||||||
|
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 1).boxed().collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> expectedValues) throws InterruptedException {
|
||||||
|
|
||||||
|
SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(expectedRecords, 10);
|
||||||
|
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords);
|
||||||
|
List<Integer> actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream()
|
||||||
|
.map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa"))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
assertThat(actual).containsAll(expectedValues);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insertRecords(int rowCount, int startingPkId) throws SQLException {
|
||||||
|
|
||||||
|
try (JdbcConnection connection = databaseConnection()) {
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
for (int i = 0; i < rowCount; i++) {
|
||||||
|
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
||||||
|
tableName(),
|
||||||
|
connection.quotedColumnIdString(pkFieldName()),
|
||||||
|
i + startingPkId + 1,
|
||||||
|
i + startingPkId));
|
||||||
|
}
|
||||||
|
connection.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep, Runnable actionOnInsert) throws SQLException {
|
||||||
|
|
||||||
|
try (JdbcConnection connection = databaseConnection()) {
|
||||||
|
connection.setAutoCommit(true);
|
||||||
|
for (int i = 0; i < rowCount; i++) {
|
||||||
|
connection.execute(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
||||||
|
tableName(),
|
||||||
|
connection.quotedColumnIdString(pkFieldName()),
|
||||||
|
i + startingPkId + 1,
|
||||||
|
i + startingPkId));
|
||||||
|
actionOnInsert.run();
|
||||||
|
int sleepTime = ThreadLocalRandom.current().nextInt(1, maxSleep);
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
}
|
||||||
|
Testing.debug(String.format("Insert of %s records completed", rowCount));
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep) throws SQLException {
|
||||||
|
|
||||||
|
insertRecordsWithRandomSleep(rowCount, startingPkId, maxSleep, () -> {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -10,11 +10,9 @@
|
|||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -23,9 +21,6 @@
|
|||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.BiPredicate;
|
import java.util.function.BiPredicate;
|
||||||
import java.util.function.Consumer;
|
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.function.Predicate;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
@ -42,8 +37,6 @@
|
|||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
import io.debezium.data.Envelope;
|
import io.debezium.data.Envelope;
|
||||||
import io.debezium.doc.FixFor;
|
import io.debezium.doc.FixFor;
|
||||||
import io.debezium.embedded.AbstractConnectorTest;
|
|
||||||
import io.debezium.engine.DebeziumEngine;
|
|
||||||
import io.debezium.jdbc.JdbcConnection;
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
import io.debezium.junit.EqualityCheck;
|
import io.debezium.junit.EqualityCheck;
|
||||||
import io.debezium.junit.SkipWhenConnectorUnderTest;
|
import io.debezium.junit.SkipWhenConnectorUnderTest;
|
||||||
@ -51,275 +44,16 @@
|
|||||||
import io.debezium.junit.logging.LogInterceptor;
|
import io.debezium.junit.logging.LogInterceptor;
|
||||||
import io.debezium.kafka.KafkaCluster;
|
import io.debezium.kafka.KafkaCluster;
|
||||||
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
|
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
|
||||||
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
|
|
||||||
import io.debezium.pipeline.signal.actions.snapshotting.StopSnapshot;
|
import io.debezium.pipeline.signal.actions.snapshotting.StopSnapshot;
|
||||||
import io.debezium.util.Testing;
|
|
||||||
|
|
||||||
public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector> extends AbstractConnectorTest {
|
public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector> extends AbstractSnapshotTest<T> {
|
||||||
|
|
||||||
protected static final int ROW_COUNT = 1_000;
|
|
||||||
private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5;
|
|
||||||
|
|
||||||
protected static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-is.txt")
|
|
||||||
.toAbsolutePath();
|
|
||||||
private static final int PARTITION_NO = 0;
|
|
||||||
private static final String SERVER_NAME = "test_server";
|
|
||||||
|
|
||||||
protected static KafkaCluster kafka;
|
protected static KafkaCluster kafka;
|
||||||
|
|
||||||
protected abstract Class<T> connectorClass();
|
|
||||||
|
|
||||||
protected abstract JdbcConnection databaseConnection();
|
|
||||||
|
|
||||||
protected abstract String topicName();
|
|
||||||
|
|
||||||
protected abstract String tableName();
|
|
||||||
|
|
||||||
protected abstract List<String> topicNames();
|
|
||||||
|
|
||||||
protected abstract List<String> tableNames();
|
|
||||||
|
|
||||||
protected abstract String signalTableName();
|
|
||||||
|
|
||||||
protected String signalTableNameSanitized() {
|
|
||||||
return signalTableName();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract Configuration.Builder config();
|
|
||||||
|
|
||||||
protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl);
|
|
||||||
|
|
||||||
protected abstract String connector();
|
|
||||||
|
|
||||||
protected abstract String server();
|
|
||||||
|
|
||||||
protected String task() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String database() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String alterTableAddColumnStatement(String tableName) {
|
|
||||||
return "ALTER TABLE " + tableName + " add col3 int default 0";
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String alterTableDropColumnStatement(String tableName) {
|
|
||||||
return "ALTER TABLE " + tableName + " drop column col3";
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String tableDataCollectionId() {
|
|
||||||
return tableName();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected List<String> tableDataCollectionIds() {
|
|
||||||
return tableNames();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void populateTable(JdbcConnection connection, String tableName) throws SQLException {
|
|
||||||
connection.setAutoCommit(false);
|
|
||||||
for (int i = 0; i < ROW_COUNT; i++) {
|
|
||||||
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
|
||||||
tableName, connection.quotedColumnIdString(pkFieldName()), i + 1, i));
|
|
||||||
}
|
|
||||||
connection.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void populateTable(JdbcConnection connection) throws SQLException {
|
|
||||||
populateTable(connection, tableName());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void populateTables(JdbcConnection connection) throws SQLException {
|
|
||||||
for (String tableName : tableNames()) {
|
|
||||||
populateTable(connection, tableName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void populateTable() throws SQLException {
|
|
||||||
try (JdbcConnection connection = databaseConnection()) {
|
|
||||||
populateTable(connection);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException {
|
|
||||||
try (JdbcConnection connection = databaseConnection()) {
|
|
||||||
populateTableWithSpecificValue(connection, tableName(), startRow, count, value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void populateTableWithSpecificValue(JdbcConnection connection, String tableName, int startRow, int count, int value)
|
|
||||||
throws SQLException {
|
|
||||||
connection.setAutoCommit(false);
|
|
||||||
for (int i = startRow + 1; i <= startRow + count; i++) {
|
|
||||||
connection.executeWithoutCommitting(
|
|
||||||
String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
|
||||||
tableName, connection.quotedColumnIdString(pkFieldName()), count + i, value));
|
|
||||||
}
|
|
||||||
connection.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void populateTables() throws SQLException {
|
|
||||||
try (JdbcConnection connection = databaseConnection()) {
|
|
||||||
populateTables(connection);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException {
|
|
||||||
connection.setAutoCommit(false);
|
|
||||||
for (int i = 0; i < ROW_COUNT; i++) {
|
|
||||||
final int id = i + 1;
|
|
||||||
final int pk1 = id / 1000;
|
|
||||||
final int pk2 = (id / 100) % 10;
|
|
||||||
final int pk3 = (id / 10) % 10;
|
|
||||||
final int pk4 = id % 10;
|
|
||||||
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
|
|
||||||
tableName,
|
|
||||||
pk1,
|
|
||||||
pk2,
|
|
||||||
pk3,
|
|
||||||
pk4,
|
|
||||||
i));
|
|
||||||
}
|
|
||||||
connection.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
|
|
||||||
return consumeMixedWithIncrementalSnapshot(recordCount, topicName());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, String topicName) throws InterruptedException {
|
|
||||||
return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), x -> true, null,
|
|
||||||
topicName);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Function<SourceRecord, V> valueConverter,
|
|
||||||
Predicate<Map.Entry<Integer, V>> dataCompleted,
|
|
||||||
Consumer<List<SourceRecord>> recordConsumer,
|
|
||||||
String topicName)
|
|
||||||
throws InterruptedException {
|
|
||||||
return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), valueConverter, topicName, recordConsumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount,
|
|
||||||
Predicate<Map.Entry<Integer, V>> dataCompleted,
|
|
||||||
Function<Struct, Integer> idCalculator,
|
|
||||||
Function<SourceRecord, V> valueConverter,
|
|
||||||
String topicName,
|
|
||||||
Consumer<List<SourceRecord>> recordConsumer)
|
|
||||||
throws InterruptedException {
|
|
||||||
final Map<Integer, V> dbChanges = new HashMap<>();
|
|
||||||
int noRecords = 0;
|
|
||||||
for (;;) {
|
|
||||||
final SourceRecords records = consumeRecordsByTopic(1);
|
|
||||||
final List<SourceRecord> dataRecords = records.recordsForTopic(topicName);
|
|
||||||
if (records.allRecordsInOrder().isEmpty()) {
|
|
||||||
noRecords++;
|
|
||||||
assertThat(noRecords).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount))
|
|
||||||
.isLessThanOrEqualTo(MAXIMUM_NO_RECORDS_CONSUMES);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
noRecords = 0;
|
|
||||||
if (dataRecords == null || dataRecords.isEmpty()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
dataRecords.forEach(record -> {
|
|
||||||
final int id = idCalculator.apply((Struct) record.key());
|
|
||||||
final V value = valueConverter.apply(record);
|
|
||||||
dbChanges.put(id, value);
|
|
||||||
});
|
|
||||||
if (recordConsumer != null) {
|
|
||||||
recordConsumer.accept(dataRecords);
|
|
||||||
}
|
|
||||||
if (dbChanges.size() >= recordCount) {
|
|
||||||
if (!dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertThat(dbChanges).hasSize(recordCount);
|
|
||||||
return dbChanges;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
|
|
||||||
return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), x -> true, null, topicName());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, Integer>> dataCompleted,
|
|
||||||
Consumer<List<SourceRecord>> recordConsumer)
|
|
||||||
throws InterruptedException {
|
|
||||||
return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), dataCompleted,
|
|
||||||
recordConsumer, topicName());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, SourceRecord>> dataCompleted,
|
|
||||||
Consumer<List<SourceRecord>> recordConsumer)
|
|
||||||
throws InterruptedException {
|
|
||||||
return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), dataCompleted, recordConsumer, topicName());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String valueFieldName() {
|
|
||||||
return "aa";
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String pkFieldName() {
|
|
||||||
return "pk";
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getSignalTypeFieldName() {
|
protected String getSignalTypeFieldName() {
|
||||||
return "type";
|
return "type";
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException {
|
|
||||||
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional<String> additionalCondition, Optional<String> surrogateKey,
|
|
||||||
String... dataCollectionIds) {
|
|
||||||
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(additionalCondition, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL,
|
|
||||||
dataCollectionIds);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional<String> additionalCondition, Optional<String> surrogateKey,
|
|
||||||
AbstractSnapshotSignal.SnapshotType snapshotType,
|
|
||||||
String... dataCollectionIds) {
|
|
||||||
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
|
|
||||||
.map(x -> '"' + x + '"')
|
|
||||||
.collect(Collectors.joining(", "));
|
|
||||||
try (JdbcConnection connection = databaseConnection()) {
|
|
||||||
String query;
|
|
||||||
if (additionalCondition.isPresent() && surrogateKey.isPresent()) {
|
|
||||||
query = String.format(
|
|
||||||
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')",
|
|
||||||
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get());
|
|
||||||
}
|
|
||||||
else if (additionalCondition.isPresent()) {
|
|
||||||
query = String.format(
|
|
||||||
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')",
|
|
||||||
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get());
|
|
||||||
}
|
|
||||||
else if (surrogateKey.isPresent()) {
|
|
||||||
query = String.format(
|
|
||||||
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')",
|
|
||||||
signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey.get());
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
query = String.format(
|
|
||||||
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')",
|
|
||||||
signalTableName(), snapshotType.toString(), dataCollectionIdsList);
|
|
||||||
}
|
|
||||||
logger.info("Sending signal with query {}", query);
|
|
||||||
connection.execute(query);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
logger.warn("Failed to send signal", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void sendAdHocSnapshotStopSignal(String... dataCollectionIds) throws SQLException {
|
protected void sendAdHocSnapshotStopSignal(String... dataCollectionIds) throws SQLException {
|
||||||
String collections = "";
|
String collections = "";
|
||||||
if (dataCollectionIds.length > 0) {
|
if (dataCollectionIds.length > 0) {
|
||||||
@ -396,39 +130,6 @@ protected void sendResumeSignal() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void startConnector(DebeziumEngine.CompletionCallback callback) {
|
|
||||||
startConnector(Function.identity(), callback, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
|
|
||||||
startConnector(custConfig, loggingCompletion(), true);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig,
|
|
||||||
DebeziumEngine.CompletionCallback callback, boolean expectNoRecords) {
|
|
||||||
final Configuration config = custConfig.apply(config()).build();
|
|
||||||
start(connectorClass(), config, callback);
|
|
||||||
waitForConnectorToStart();
|
|
||||||
|
|
||||||
waitForAvailableRecords(5, TimeUnit.SECONDS);
|
|
||||||
if (expectNoRecords) {
|
|
||||||
// there shouldn't be any snapshot records
|
|
||||||
assertNoRecordsToConsume();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void startConnectorWithSnapshot(Function<Configuration.Builder, Configuration.Builder> custConfig) {
|
|
||||||
startConnector(custConfig, loggingCompletion(), false);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void startConnector() {
|
|
||||||
startConnector(Function.identity(), loggingCompletion(), true);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void waitForConnectorToStart() {
|
|
||||||
assertConnectorIsRunning();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void snapshotOnly() throws Exception {
|
public void snapshotOnly() throws Exception {
|
||||||
// Testing.Print.enable();
|
// Testing.Print.enable();
|
||||||
@ -1208,10 +909,6 @@ private void assertCorrectIncrementalSnapshotNotification(List<SourceRecord> not
|
|||||||
.containsEntry("total_rows_scanned", "1000");
|
.containsEntry("total_rows_scanned", "1000");
|
||||||
}
|
}
|
||||||
|
|
||||||
private Function<Struct, Integer> getRecordValue() {
|
|
||||||
return s -> s.getStruct("after").getInt32(valueFieldName());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void sendAdHocSnapshotSignalAndWait(String... collectionIds) throws Exception {
|
protected void sendAdHocSnapshotSignalAndWait(String... collectionIds) throws Exception {
|
||||||
// Sends the adhoc snapshot signal and waits for the signal event to have been received
|
// Sends the adhoc snapshot signal and waits for the signal event to have been received
|
||||||
if (collectionIds.length == 0) {
|
if (collectionIds.length == 0) {
|
||||||
@ -1271,8 +968,4 @@ protected boolean consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMes
|
|||||||
return stopMessageFound.get();
|
return stopMessageFound.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int getMaximumEnqueuedRecordCount() {
|
|
||||||
return ROW_COUNT * 3;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,332 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.pipeline.source.snapshot.incremental;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Struct;
|
||||||
|
import org.apache.kafka.connect.source.SourceConnector;
|
||||||
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
|
|
||||||
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.embedded.AbstractConnectorTest;
|
||||||
|
import io.debezium.engine.DebeziumEngine;
|
||||||
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
|
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
|
||||||
|
|
||||||
|
public abstract class AbstractSnapshotTest<T extends SourceConnector> extends AbstractConnectorTest {
|
||||||
|
|
||||||
|
protected static final int ROW_COUNT = 1000;
|
||||||
|
protected static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-is.txt")
|
||||||
|
.toAbsolutePath();
|
||||||
|
protected static final int PARTITION_NO = 0;
|
||||||
|
protected static final String SERVER_NAME = "test_server";
|
||||||
|
private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5;
|
||||||
|
|
||||||
|
protected abstract Class<T> connectorClass();
|
||||||
|
|
||||||
|
protected abstract JdbcConnection databaseConnection();
|
||||||
|
|
||||||
|
protected abstract String topicName();
|
||||||
|
|
||||||
|
protected abstract String tableName();
|
||||||
|
|
||||||
|
protected abstract List<String> topicNames();
|
||||||
|
|
||||||
|
protected abstract List<String> tableNames();
|
||||||
|
|
||||||
|
protected abstract String signalTableName();
|
||||||
|
|
||||||
|
protected String signalTableNameSanitized() {
|
||||||
|
return signalTableName();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract Configuration.Builder config();
|
||||||
|
|
||||||
|
protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl);
|
||||||
|
|
||||||
|
protected abstract String connector();
|
||||||
|
|
||||||
|
protected abstract String server();
|
||||||
|
|
||||||
|
protected String task() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String database() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String alterTableAddColumnStatement(String tableName) {
|
||||||
|
return "ALTER TABLE " + tableName + " add col3 int default 0";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String alterTableDropColumnStatement(String tableName) {
|
||||||
|
return "ALTER TABLE " + tableName + " drop column col3";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String tableDataCollectionId() {
|
||||||
|
return tableName();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<String> tableDataCollectionIds() {
|
||||||
|
return tableNames();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void populateTable(JdbcConnection connection, String tableName) throws SQLException {
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
for (int i = 0; i < ROW_COUNT; i++) {
|
||||||
|
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
||||||
|
tableName, connection.quotedColumnIdString(pkFieldName()), i + 1, i));
|
||||||
|
}
|
||||||
|
connection.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void populateTable(JdbcConnection connection) throws SQLException {
|
||||||
|
populateTable(connection, tableName());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void populateTables(JdbcConnection connection) throws SQLException {
|
||||||
|
for (String tableName : tableNames()) {
|
||||||
|
populateTable(connection, tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void populateTable() throws SQLException {
|
||||||
|
try (JdbcConnection connection = databaseConnection()) {
|
||||||
|
populateTable(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException {
|
||||||
|
try (JdbcConnection connection = databaseConnection()) {
|
||||||
|
populateTableWithSpecificValue(connection, tableName(), startRow, count, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void populateTableWithSpecificValue(JdbcConnection connection, String tableName, int startRow, int count, int value)
|
||||||
|
throws SQLException {
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
for (int i = startRow + 1; i <= startRow + count; i++) {
|
||||||
|
connection.executeWithoutCommitting(
|
||||||
|
String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
|
||||||
|
tableName, connection.quotedColumnIdString(pkFieldName()), count + i, value));
|
||||||
|
}
|
||||||
|
connection.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void populateTables() throws SQLException {
|
||||||
|
try (JdbcConnection connection = databaseConnection()) {
|
||||||
|
populateTables(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException {
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
for (int i = 0; i < ROW_COUNT; i++) {
|
||||||
|
final int id = i + 1;
|
||||||
|
final int pk1 = id / 1000;
|
||||||
|
final int pk2 = (id / 100) % 10;
|
||||||
|
final int pk3 = (id / 10) % 10;
|
||||||
|
final int pk4 = id % 10;
|
||||||
|
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
|
||||||
|
tableName,
|
||||||
|
pk1,
|
||||||
|
pk2,
|
||||||
|
pk3,
|
||||||
|
pk4,
|
||||||
|
i));
|
||||||
|
}
|
||||||
|
connection.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
|
||||||
|
return consumeMixedWithIncrementalSnapshot(recordCount, topicName());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, String topicName) throws InterruptedException {
|
||||||
|
return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), x -> true, null,
|
||||||
|
topicName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Function<SourceRecord, V> valueConverter,
|
||||||
|
Predicate<Map.Entry<Integer, V>> dataCompleted,
|
||||||
|
Consumer<List<SourceRecord>> recordConsumer,
|
||||||
|
String topicName)
|
||||||
|
throws InterruptedException {
|
||||||
|
return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), valueConverter, topicName, recordConsumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount,
|
||||||
|
Predicate<Map.Entry<Integer, V>> dataCompleted,
|
||||||
|
Function<Struct, Integer> idCalculator,
|
||||||
|
Function<SourceRecord, V> valueConverter,
|
||||||
|
String topicName,
|
||||||
|
Consumer<List<SourceRecord>> recordConsumer)
|
||||||
|
throws InterruptedException {
|
||||||
|
final Map<Integer, V> dbChanges = new HashMap<>();
|
||||||
|
int noRecords = 0;
|
||||||
|
for (;;) {
|
||||||
|
final SourceRecords records = consumeRecordsByTopic(1);
|
||||||
|
final List<SourceRecord> dataRecords = records.recordsForTopic(topicName);
|
||||||
|
if (records.allRecordsInOrder().isEmpty()) {
|
||||||
|
noRecords++;
|
||||||
|
assertThat(noRecords).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount))
|
||||||
|
.isLessThanOrEqualTo(MAXIMUM_NO_RECORDS_CONSUMES);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
noRecords = 0;
|
||||||
|
if (dataRecords == null || dataRecords.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
dataRecords.forEach(record -> {
|
||||||
|
final int id = idCalculator.apply((Struct) record.key());
|
||||||
|
final V value = valueConverter.apply(record);
|
||||||
|
dbChanges.put(id, value);
|
||||||
|
});
|
||||||
|
if (recordConsumer != null) {
|
||||||
|
recordConsumer.accept(dataRecords);
|
||||||
|
}
|
||||||
|
if (dbChanges.size() >= recordCount) {
|
||||||
|
if (!dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(dbChanges).hasSize(recordCount);
|
||||||
|
return dbChanges;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
|
||||||
|
return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), x -> true, null, topicName());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, Integer>> dataCompleted,
|
||||||
|
Consumer<List<SourceRecord>> recordConsumer)
|
||||||
|
throws InterruptedException {
|
||||||
|
return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), dataCompleted,
|
||||||
|
recordConsumer, topicName());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, SourceRecord>> dataCompleted,
|
||||||
|
Consumer<List<SourceRecord>> recordConsumer)
|
||||||
|
throws InterruptedException {
|
||||||
|
return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), dataCompleted, recordConsumer, topicName());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String valueFieldName() {
|
||||||
|
return "aa";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String pkFieldName() {
|
||||||
|
return "pk";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startConnector(DebeziumEngine.CompletionCallback callback) {
|
||||||
|
startConnector(Function.identity(), callback, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
|
||||||
|
startConnector(custConfig, loggingCompletion(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig,
|
||||||
|
DebeziumEngine.CompletionCallback callback, boolean expectNoRecords) {
|
||||||
|
final Configuration config = custConfig.apply(config()).build();
|
||||||
|
start(connectorClass(), config, callback);
|
||||||
|
waitForConnectorToStart();
|
||||||
|
|
||||||
|
waitForAvailableRecords(5, TimeUnit.SECONDS);
|
||||||
|
if (expectNoRecords) {
|
||||||
|
// there shouldn't be any snapshot records
|
||||||
|
assertNoRecordsToConsume();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startConnectorWithSnapshot(Function<Configuration.Builder, Configuration.Builder> custConfig) {
|
||||||
|
startConnector(custConfig, loggingCompletion(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startConnector() {
|
||||||
|
startConnector(Function.identity(), loggingCompletion(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void waitForConnectorToStart() {
|
||||||
|
assertConnectorIsRunning();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Function<Struct, Integer> getRecordValue() {
|
||||||
|
return s -> s.getStruct("after").getInt32(valueFieldName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getMaximumEnqueuedRecordCount() {
|
||||||
|
return ROW_COUNT * 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException {
|
||||||
|
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional<String> additionalCondition, Optional<String> surrogateKey,
|
||||||
|
String... dataCollectionIds) {
|
||||||
|
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(additionalCondition, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL,
|
||||||
|
dataCollectionIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional<String> additionalCondition, Optional<String> surrogateKey,
|
||||||
|
AbstractSnapshotSignal.SnapshotType snapshotType,
|
||||||
|
String... dataCollectionIds) {
|
||||||
|
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
|
||||||
|
.map(x -> '"' + x + '"')
|
||||||
|
.collect(Collectors.joining(", "));
|
||||||
|
try (JdbcConnection connection = databaseConnection()) {
|
||||||
|
String query;
|
||||||
|
if (additionalCondition.isPresent() && surrogateKey.isPresent()) {
|
||||||
|
query = String.format(
|
||||||
|
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')",
|
||||||
|
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get());
|
||||||
|
}
|
||||||
|
else if (additionalCondition.isPresent()) {
|
||||||
|
query = String.format(
|
||||||
|
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')",
|
||||||
|
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get());
|
||||||
|
}
|
||||||
|
else if (surrogateKey.isPresent()) {
|
||||||
|
query = String.format(
|
||||||
|
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')",
|
||||||
|
signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey.get());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
query = String.format(
|
||||||
|
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')",
|
||||||
|
signalTableName(), snapshotType.toString(), dataCollectionIdsList);
|
||||||
|
}
|
||||||
|
logger.info("Sending signal with query {}", query);
|
||||||
|
connection.execute(query);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
logger.warn("Failed to send signal", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user