DBZ-3473 First prototype of incremental snapshotting
This commit is contained in:
parent
66da6938a0
commit
a52c536a68
@ -1124,7 +1124,8 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
|
|||||||
HSTORE_HANDLING_MODE,
|
HSTORE_HANDLING_MODE,
|
||||||
BINARY_HANDLING_MODE,
|
BINARY_HANDLING_MODE,
|
||||||
INTERVAL_HANDLING_MODE,
|
INTERVAL_HANDLING_MODE,
|
||||||
SCHEMA_REFRESH_MODE)
|
SCHEMA_REFRESH_MODE,
|
||||||
|
INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
|
||||||
.excluding(INCLUDE_SCHEMA_CHANGES)
|
.excluding(INCLUDE_SCHEMA_CHANGES)
|
||||||
.create();
|
.create();
|
||||||
|
|
||||||
|
@ -187,7 +187,8 @@ public ChangeEventSourceCoordinator start(Configuration config) {
|
|||||||
PostgresChangeRecordEmitter::updateSchema,
|
PostgresChangeRecordEmitter::updateSchema,
|
||||||
metadataProvider,
|
metadataProvider,
|
||||||
heartbeat,
|
heartbeat,
|
||||||
schemaNameAdjuster);
|
schemaNameAdjuster,
|
||||||
|
jdbcConnection);
|
||||||
|
|
||||||
ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator(
|
ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator(
|
||||||
previousOffset,
|
previousOffset,
|
||||||
|
@ -275,4 +275,9 @@ public void event(DataCollectionId tableId, Instant instant) {
|
|||||||
public TransactionContext getTransactionContext() {
|
public TransactionContext getTransactionContext() {
|
||||||
return transactionContext;
|
return transactionContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incrementalSnapshotWindow() {
|
||||||
|
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,197 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.debezium.connector.postgresql;
|
||||||
|
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Struct;
|
||||||
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
|
import org.fest.assertions.Assertions;
|
||||||
|
import org.fest.assertions.MapAssert;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
|
||||||
|
import io.debezium.connector.postgresql.connection.PostgresConnection;
|
||||||
|
import io.debezium.embedded.AbstractConnectorTest;
|
||||||
|
import io.debezium.util.Testing;
|
||||||
|
|
||||||
|
public class IncrementalSnapshotIT extends AbstractConnectorTest {
|
||||||
|
|
||||||
|
private static final int ROW_COUNT = 1_000;
|
||||||
|
private static final String TOPIC_NAME = "test_server.s1.a";
|
||||||
|
private static final int MAXIMUM_NO_RECORDS_CONSUMES = 3;
|
||||||
|
|
||||||
|
private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" +
|
||||||
|
"CREATE SCHEMA s1; " +
|
||||||
|
"CREATE SCHEMA s2; " +
|
||||||
|
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
|
||||||
|
"CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws SQLException {
|
||||||
|
TestHelper.dropAllSchemas();
|
||||||
|
initializeConnectorTestFramework();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() {
|
||||||
|
stopConnector();
|
||||||
|
TestHelper.dropDefaultReplicationSlot();
|
||||||
|
TestHelper.dropPublication();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void populateTable() throws SQLException {
|
||||||
|
try (final PostgresConnection pgConnection = TestHelper.create()) {
|
||||||
|
pgConnection.setAutoCommit(false);
|
||||||
|
for (int i = 0; i < ROW_COUNT; i++) {
|
||||||
|
pgConnection.executeWithoutCommitting(String.format("INSERT INTO s1.a (aa) VALUES (%s)", i));
|
||||||
|
}
|
||||||
|
pgConnection.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
|
||||||
|
final Map<Integer, Integer> dbChanges = new HashMap<>();
|
||||||
|
int noRecords = 0;
|
||||||
|
for (;;) {
|
||||||
|
final SourceRecords records = consumeRecordsByTopic(1);
|
||||||
|
final List<SourceRecord> dataRecords = records.recordsForTopic(TOPIC_NAME);
|
||||||
|
if (dataRecords == null || dataRecords.isEmpty()) {
|
||||||
|
noRecords++;
|
||||||
|
Assertions.assertThat(noRecords)
|
||||||
|
.describedAs("Too many no data record results")
|
||||||
|
.isLessThan(MAXIMUM_NO_RECORDS_CONSUMES);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
noRecords = 0;
|
||||||
|
dataRecords.forEach(record -> {
|
||||||
|
final int id = ((Struct) record.key()).getInt32("pk");
|
||||||
|
final int value = ((Struct) record.value()).getStruct("after").getInt32("aa");
|
||||||
|
dbChanges.put(id, value);
|
||||||
|
});
|
||||||
|
if (dbChanges.size() >= recordCount) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assertions.assertThat(dbChanges).hasSize(recordCount);
|
||||||
|
return dbChanges;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void snapshotOnly() throws Exception {
|
||||||
|
Testing.Print.enable();
|
||||||
|
|
||||||
|
TestHelper.dropDefaultReplicationSlot();
|
||||||
|
TestHelper.execute(SETUP_TABLES_STMT);
|
||||||
|
populateTable();
|
||||||
|
Configuration config = TestHelper.defaultConfig()
|
||||||
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
|
||||||
|
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
||||||
|
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal")
|
||||||
|
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
|
||||||
|
.build();
|
||||||
|
start(PostgresConnector.class, config);
|
||||||
|
assertConnectorIsRunning();
|
||||||
|
TestHelper.waitForDefaultReplicationSlotBeActive();
|
||||||
|
|
||||||
|
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
|
||||||
|
// there shouldn't be any snapshot records
|
||||||
|
assertNoRecordsToConsume();
|
||||||
|
|
||||||
|
// Insert the signal record
|
||||||
|
TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"s1.a\"]}')");
|
||||||
|
|
||||||
|
final int expectedRecordCount = ROW_COUNT;
|
||||||
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
||||||
|
for (int i = 0; i < expectedRecordCount; i++) {
|
||||||
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void inserts() throws Exception {
|
||||||
|
Testing.Print.enable();
|
||||||
|
|
||||||
|
TestHelper.dropDefaultReplicationSlot();
|
||||||
|
TestHelper.execute(SETUP_TABLES_STMT);
|
||||||
|
populateTable();
|
||||||
|
Configuration config = TestHelper.defaultConfig()
|
||||||
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
|
||||||
|
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
||||||
|
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal")
|
||||||
|
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
|
||||||
|
.build();
|
||||||
|
start(PostgresConnector.class, config);
|
||||||
|
assertConnectorIsRunning();
|
||||||
|
TestHelper.waitForDefaultReplicationSlotBeActive();
|
||||||
|
|
||||||
|
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
|
||||||
|
// there shouldn't be any snapshot records
|
||||||
|
assertNoRecordsToConsume();
|
||||||
|
|
||||||
|
// Insert the signal record
|
||||||
|
TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"s1.a\"]}')");
|
||||||
|
|
||||||
|
try (final PostgresConnection pgConnection = TestHelper.create()) {
|
||||||
|
for (int i = 0; i < ROW_COUNT; i++) {
|
||||||
|
pgConnection.executeWithoutCommitting(String.format("INSERT INTO s1.a (aa) VALUES (%s)", i + ROW_COUNT));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final int expectedRecordCount = ROW_COUNT * 2;
|
||||||
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
||||||
|
for (int i = 0; i < expectedRecordCount; i++) {
|
||||||
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void updates() throws Exception {
|
||||||
|
Testing.Print.enable();
|
||||||
|
|
||||||
|
TestHelper.dropDefaultReplicationSlot();
|
||||||
|
TestHelper.execute(SETUP_TABLES_STMT);
|
||||||
|
populateTable();
|
||||||
|
Configuration config = TestHelper.defaultConfig()
|
||||||
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
|
||||||
|
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
||||||
|
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal")
|
||||||
|
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
|
||||||
|
.build();
|
||||||
|
start(PostgresConnector.class, config);
|
||||||
|
assertConnectorIsRunning();
|
||||||
|
TestHelper.waitForDefaultReplicationSlotBeActive();
|
||||||
|
|
||||||
|
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
|
||||||
|
// there shouldn't be any snapshot records
|
||||||
|
assertNoRecordsToConsume();
|
||||||
|
|
||||||
|
// Insert the signal record
|
||||||
|
TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"s1.a\"]}')");
|
||||||
|
|
||||||
|
final int batchSize = 10;
|
||||||
|
try (final PostgresConnection pgConnection = TestHelper.create()) {
|
||||||
|
for (int i = 0; i < ROW_COUNT / batchSize; i++) {
|
||||||
|
TestHelper.execute(String.format("UPDATE s1.a SET aa = aa + 1000 WHERE pk > %s AND pk <= %s", i * batchSize, (i + 1) * batchSize));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final int expectedRecordCount = ROW_COUNT;
|
||||||
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
||||||
|
for (int i = 0; i < expectedRecordCount; i++) {
|
||||||
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i + 1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -3,14 +3,15 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
|||||||
log4j.appender.stdout.Target=System.out
|
log4j.appender.stdout.Target=System.out
|
||||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||||
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n
|
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n
|
||||||
log4j.appender.stdout.threshold=WARN
|
log4j.appender.stdout.threshold=DEBUG
|
||||||
|
|
||||||
# Root logger option
|
# Root logger option
|
||||||
log4j.rootLogger=INFO, stdout
|
log4j.rootLogger=INFO, stdout
|
||||||
|
|
||||||
# Set up the default logging to be INFO level, then override specific units
|
# Set up the default logging to be INFO level, then override specific units
|
||||||
log4j.logger.io.debezium=INFO
|
log4j.logger.io.debezium=INFO
|
||||||
log4j.logger.io.debezium.connector.postgresql=INFO
|
log4j.logger.io.debezium.pipeline=DEBUG
|
||||||
|
log4j.logger.io.debezium.connector.postgresql=DEBUG
|
||||||
log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings()
|
log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings()
|
||||||
log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
|
log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
|
||||||
#log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG
|
#log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG
|
||||||
|
@ -323,6 +323,15 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
|
|||||||
.withDescription("The maximum number of records that should be loaded into memory while performing a snapshot")
|
.withDescription("The maximum number of records that should be loaded into memory while performing a snapshot")
|
||||||
.withValidation(Field::isNonNegativeInteger);
|
.withValidation(Field::isNonNegativeInteger);
|
||||||
|
|
||||||
|
public static final Field INCREMENTAL_SNAPSHOT_CHUNK_SIZE = Field.create("incremental.snapshot.chunk.size")
|
||||||
|
.withDisplayName("Incremental snapshot chunk size")
|
||||||
|
.withType(Type.INT)
|
||||||
|
.withWidth(Width.MEDIUM)
|
||||||
|
.withImportance(Importance.MEDIUM)
|
||||||
|
.withDescription("The maximum size of chunk for incremental snapshotting")
|
||||||
|
.withDefault(1024)
|
||||||
|
.withValidation(Field::isNonNegativeInteger);
|
||||||
|
|
||||||
public static final Field SNAPSHOT_MODE_TABLES = Field.create("snapshot.include.collection.list")
|
public static final Field SNAPSHOT_MODE_TABLES = Field.create("snapshot.include.collection.list")
|
||||||
.withDisplayName("Snapshot mode include data collection")
|
.withDisplayName("Snapshot mode include data collection")
|
||||||
.withType(Type.LIST)
|
.withType(Type.LIST)
|
||||||
@ -454,6 +463,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
|
|||||||
private final Duration snapshotDelayMs;
|
private final Duration snapshotDelayMs;
|
||||||
private final Duration retriableRestartWait;
|
private final Duration retriableRestartWait;
|
||||||
private final int snapshotFetchSize;
|
private final int snapshotFetchSize;
|
||||||
|
private final int incrementalSnapshotChunkSize;
|
||||||
private final int snapshotMaxThreads;
|
private final int snapshotMaxThreads;
|
||||||
private final Integer queryFetchSize;
|
private final Integer queryFetchSize;
|
||||||
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
|
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
|
||||||
@ -479,6 +489,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
|
|||||||
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize);
|
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize);
|
||||||
this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS);
|
this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS);
|
||||||
this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE);
|
this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE);
|
||||||
|
this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
|
||||||
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
|
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
|
||||||
this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config);
|
this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config);
|
||||||
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
|
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
|
||||||
@ -565,6 +576,10 @@ public int getQueryFetchSize() {
|
|||||||
return queryFetchSize;
|
return queryFetchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getIncrementalSnashotChunkSize() {
|
||||||
|
return incrementalSnapshotChunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean shouldProvideTransactionMetadata() {
|
public boolean shouldProvideTransactionMetadata() {
|
||||||
return shouldProvideTransactionMetadata;
|
return shouldProvideTransactionMetadata;
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,11 @@ public enum SnapshotRecord {
|
|||||||
/**
|
/**
|
||||||
* Record is from streaming phase.
|
* Record is from streaming phase.
|
||||||
*/
|
*/
|
||||||
FALSE;
|
FALSE,
|
||||||
|
/**
|
||||||
|
* Record is from incremental snapshot window.
|
||||||
|
*/
|
||||||
|
INCREMENTAL;
|
||||||
|
|
||||||
public static SnapshotRecord fromSource(Struct source) {
|
public static SnapshotRecord fromSource(Struct source) {
|
||||||
if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null
|
if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null
|
||||||
|
@ -29,7 +29,9 @@
|
|||||||
import io.debezium.data.Envelope;
|
import io.debezium.data.Envelope;
|
||||||
import io.debezium.data.Envelope.Operation;
|
import io.debezium.data.Envelope.Operation;
|
||||||
import io.debezium.heartbeat.Heartbeat;
|
import io.debezium.heartbeat.Heartbeat;
|
||||||
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
import io.debezium.pipeline.signal.Signal;
|
import io.debezium.pipeline.signal.Signal;
|
||||||
|
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
|
||||||
import io.debezium.pipeline.source.spi.DataChangeEventListener;
|
import io.debezium.pipeline.source.spi.DataChangeEventListener;
|
||||||
import io.debezium.pipeline.source.spi.EventMetadataProvider;
|
import io.debezium.pipeline.source.spi.EventMetadataProvider;
|
||||||
import io.debezium.pipeline.spi.ChangeEventCreator;
|
import io.debezium.pipeline.spi.ChangeEventCreator;
|
||||||
@ -82,6 +84,7 @@ public class EventDispatcher<T extends DataCollectionId> {
|
|||||||
private final Schema schemaChangeValueSchema;
|
private final Schema schemaChangeValueSchema;
|
||||||
private final TableChangesSerializer<List<Struct>> tableChangesSerializer = new ConnectTableChangeSerializer();
|
private final TableChangesSerializer<List<Struct>> tableChangesSerializer = new ConnectTableChangeSerializer();
|
||||||
private final Signal signal;
|
private final Signal signal;
|
||||||
|
private final IncrementalSnapshotChangeEventSource<T> incrementalSnapshotChangeEventSource;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Change event receiver for events dispatched from a streaming change event source.
|
* Change event receiver for events dispatched from a streaming change event source.
|
||||||
@ -91,20 +94,23 @@ public class EventDispatcher<T extends DataCollectionId> {
|
|||||||
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
|
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
|
||||||
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
|
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
|
||||||
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
|
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
|
||||||
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, null, schemaNameAdjuster);
|
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
|
||||||
|
null, schemaNameAdjuster, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
|
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
|
||||||
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
|
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
|
||||||
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider,
|
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider,
|
||||||
Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
|
Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
|
||||||
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, heartbeat, schemaNameAdjuster);
|
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
|
||||||
|
heartbeat, schemaNameAdjuster, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
|
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
|
||||||
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
|
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
|
||||||
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<T> inconsistentSchemaHandler,
|
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<T> inconsistentSchemaHandler,
|
||||||
EventMetadataProvider metadataProvider, Heartbeat customHeartbeat, SchemaNameAdjuster schemaNameAdjuster) {
|
EventMetadataProvider metadataProvider, Heartbeat customHeartbeat, SchemaNameAdjuster schemaNameAdjuster,
|
||||||
|
JdbcConnection jdbcConnection) {
|
||||||
this.connectorConfig = connectorConfig;
|
this.connectorConfig = connectorConfig;
|
||||||
this.topicSelector = topicSelector;
|
this.topicSelector = topicSelector;
|
||||||
this.schema = schema;
|
this.schema = schema;
|
||||||
@ -121,6 +127,8 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
|
|||||||
this.neverSkip = connectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty();
|
this.neverSkip = connectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty();
|
||||||
|
|
||||||
this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::enqueueTransactionMessage);
|
this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::enqueueTransactionMessage);
|
||||||
|
this.incrementalSnapshotChangeEventSource = new IncrementalSnapshotChangeEventSource<T>(
|
||||||
|
connectorConfig, jdbcConnection, schema, this);
|
||||||
this.signal = new Signal(connectorConfig, this);
|
this.signal = new Signal(connectorConfig, this);
|
||||||
if (customHeartbeat != null) {
|
if (customHeartbeat != null) {
|
||||||
heartbeat = customHeartbeat;
|
heartbeat = customHeartbeat;
|
||||||
@ -176,6 +184,10 @@ public SnapshotReceiver getSnapshotChangeEventReceiver() {
|
|||||||
return new BufferingSnapshotChangeRecordReceiver();
|
return new BufferingSnapshotChangeRecordReceiver();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SnapshotReceiver getIncrementalSnapshotChangeEventReceiver() {
|
||||||
|
return new IncrementalSnapshotChangeRecordReceiver();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dispatches one or more {@link DataChangeEvent}s. If the given data collection is included in the currently
|
* Dispatches one or more {@link DataChangeEvent}s. If the given data collection is included in the currently
|
||||||
* captured set of collections, the given emitter will be invoked, so it can emit one or more events (in the common
|
* captured set of collections, the given emitter will be invoked, so it can emit one or more events (in the common
|
||||||
@ -220,6 +232,7 @@ public void changeRecord(DataCollectionSchema schema,
|
|||||||
if (neverSkip || !skippedOperations.contains(operation)) {
|
if (neverSkip || !skippedOperations.contains(operation)) {
|
||||||
transactionMonitor.dataEvent(dataCollectionId, offset, key, value);
|
transactionMonitor.dataEvent(dataCollectionId, offset, key, value);
|
||||||
eventListener.onEvent(dataCollectionId, offset, key, value);
|
eventListener.onEvent(dataCollectionId, offset, key, value);
|
||||||
|
incrementalSnapshotChangeEventSource.processMessage(dataCollectionId, key);
|
||||||
streamingReceiver.changeRecord(schema, operation, key, value, offset, headers);
|
streamingReceiver.changeRecord(schema, operation, key, value, offset, headers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -364,7 +377,8 @@ public void changeRecord(DataCollectionSchema dataCollectionSchema,
|
|||||||
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
|
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
|
||||||
|
|
||||||
SourceRecord record = new SourceRecord(offsetContext.getPartition(),
|
SourceRecord record = new SourceRecord(offsetContext.getPartition(),
|
||||||
offsetContext.getOffset(), topicName, null,
|
incrementalSnapshotChangeEventSource.store(offsetContext.getOffset()),
|
||||||
|
topicName, null,
|
||||||
keySchema, key,
|
keySchema, key,
|
||||||
dataCollectionSchema.getEnvelopeSchema().schema(),
|
dataCollectionSchema.getEnvelopeSchema().schema(),
|
||||||
value,
|
value,
|
||||||
@ -445,6 +459,37 @@ public void completeSnapshot() throws InterruptedException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class IncrementalSnapshotChangeRecordReceiver implements SnapshotReceiver {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void changeRecord(DataCollectionSchema dataCollectionSchema,
|
||||||
|
Operation operation,
|
||||||
|
Object key, Struct value,
|
||||||
|
OffsetContext offsetContext,
|
||||||
|
ConnectHeaders headers)
|
||||||
|
throws InterruptedException {
|
||||||
|
Objects.requireNonNull(value, "value must not be null");
|
||||||
|
|
||||||
|
LOGGER.trace("Received change record for {} operation on key {}", operation, key);
|
||||||
|
|
||||||
|
Schema keySchema = dataCollectionSchema.keySchema();
|
||||||
|
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
|
||||||
|
|
||||||
|
SourceRecord record = new SourceRecord(
|
||||||
|
offsetContext.getPartition(),
|
||||||
|
incrementalSnapshotChangeEventSource.store(offsetContext.getOffset()),
|
||||||
|
topicName, null,
|
||||||
|
keySchema, key,
|
||||||
|
dataCollectionSchema.getEnvelopeSchema().schema(), value,
|
||||||
|
null, headers);
|
||||||
|
queue.enqueue(changeEventCreator.createDataChangeEvent(record));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completeSnapshot() throws InterruptedException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final class SchemaChangeEventReceiver implements SchemaChangeEventEmitter.Receiver {
|
private final class SchemaChangeEventReceiver implements SchemaChangeEventEmitter.Receiver {
|
||||||
|
|
||||||
private Struct schemaChangeRecordKey(SchemaChangeEvent event) {
|
private Struct schemaChangeRecordKey(SchemaChangeEvent event) {
|
||||||
@ -507,4 +552,8 @@ public DatabaseSchema<T> getSchema() {
|
|||||||
public HistorizedDatabaseSchema<T> getHistorizedSchema() {
|
public HistorizedDatabaseSchema<T> getHistorizedSchema() {
|
||||||
return historizedSchema;
|
return historizedSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IncrementalSnapshotChangeEventSource<T> getIncrementalSnapshotChangeEventSource() {
|
||||||
|
return incrementalSnapshotChangeEventSource;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.pipeline.signal;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import io.debezium.document.Array;
|
||||||
|
import io.debezium.pipeline.signal.Signal.Payload;
|
||||||
|
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The action to trigger an ad-hoc snapshot.
|
||||||
|
* The action parameters are {@code type} of snapshot and list of {@code data-collections} on which the
|
||||||
|
* snapshot will be executed.
|
||||||
|
*
|
||||||
|
* @author Jiri Pechanec
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class ExecuteSnapshot implements Signal.Action {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteSnapshot.class);
|
||||||
|
private static final String FIELD_DATA_COLLECTIONS = "data-collections";
|
||||||
|
private static final String FIELD_TYPE = "type";
|
||||||
|
|
||||||
|
public static final String NAME = "execute-snapshot";
|
||||||
|
|
||||||
|
public enum SnapshotType {
|
||||||
|
INCREMENTAL
|
||||||
|
}
|
||||||
|
|
||||||
|
private final IncrementalSnapshotChangeEventSource eventSource;
|
||||||
|
|
||||||
|
public ExecuteSnapshot(IncrementalSnapshotChangeEventSource eventSource) {
|
||||||
|
this.eventSource = eventSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean arrived(Payload signalPayload) {
|
||||||
|
final Array dataCollectionsArray = signalPayload.data.getArray("data-collections");
|
||||||
|
if (dataCollectionsArray == null || dataCollectionsArray.isEmpty()) {
|
||||||
|
LOGGER.warn(
|
||||||
|
"Execute snapshot signal '{}' has arrived but the requested field '{}' is missing from data or is empty",
|
||||||
|
signalPayload, FIELD_DATA_COLLECTIONS);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
final List<String> dataCollections = dataCollectionsArray.streamValues().map(v -> v.asString().trim())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
final String typeStr = signalPayload.data.getString(FIELD_TYPE);
|
||||||
|
SnapshotType type = SnapshotType.INCREMENTAL;
|
||||||
|
if (typeStr != null) {
|
||||||
|
type = SnapshotType.valueOf(typeStr);
|
||||||
|
}
|
||||||
|
LOGGER.info("Requested '{}' snapshot of data collections '{}'", type, dataCollections);
|
||||||
|
switch (type) {
|
||||||
|
case INCREMENTAL:
|
||||||
|
eventSource.addDataCollectionNamesToSnapshot(dataCollections);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -21,6 +21,9 @@
|
|||||||
import io.debezium.document.Document;
|
import io.debezium.document.Document;
|
||||||
import io.debezium.document.DocumentReader;
|
import io.debezium.document.DocumentReader;
|
||||||
import io.debezium.pipeline.EventDispatcher;
|
import io.debezium.pipeline.EventDispatcher;
|
||||||
|
import io.debezium.pipeline.source.snapshot.incremental.CloseIncrementalSnapshotWindow;
|
||||||
|
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
|
||||||
|
import io.debezium.pipeline.source.snapshot.incremental.OpenIncrementalSnapshotWindow;
|
||||||
import io.debezium.pipeline.spi.OffsetContext;
|
import io.debezium.pipeline.spi.OffsetContext;
|
||||||
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
|
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
|
||||||
import io.debezium.schema.DataCollectionId;
|
import io.debezium.schema.DataCollectionId;
|
||||||
@ -102,6 +105,10 @@ public Signal(CommonConnectorConfig connectorConfig, EventDispatcher<? extends D
|
|||||||
else {
|
else {
|
||||||
registerSignalAction(SchemaChanges.NAME, new SchemaChanges(dispatcher, false));
|
registerSignalAction(SchemaChanges.NAME, new SchemaChanges(dispatcher, false));
|
||||||
}
|
}
|
||||||
|
final IncrementalSnapshotChangeEventSource eventSource = eventDispatcher != null ? eventDispatcher.getIncrementalSnapshotChangeEventSource() : null;
|
||||||
|
registerSignalAction(ExecuteSnapshot.NAME, new ExecuteSnapshot(eventSource));
|
||||||
|
registerSignalAction(OpenIncrementalSnapshotWindow.NAME, new OpenIncrementalSnapshotWindow(eventSource));
|
||||||
|
registerSignalAction(CloseIncrementalSnapshotWindow.NAME, new CloseIncrementalSnapshotWindow(eventSource));
|
||||||
}
|
}
|
||||||
|
|
||||||
Signal(CommonConnectorConfig connectorConfig) {
|
Signal(CommonConnectorConfig connectorConfig) {
|
||||||
|
@ -0,0 +1,32 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.pipeline.source.snapshot.incremental;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import io.debezium.pipeline.signal.Signal;
|
||||||
|
import io.debezium.pipeline.signal.Signal.Payload;
|
||||||
|
|
||||||
|
public class CloseIncrementalSnapshotWindow implements Signal.Action {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(CloseIncrementalSnapshotWindow.class);
|
||||||
|
|
||||||
|
public static final String NAME = "snapshot-window-close";
|
||||||
|
|
||||||
|
private IncrementalSnapshotChangeEventSource eventSource;
|
||||||
|
|
||||||
|
public CloseIncrementalSnapshotWindow(IncrementalSnapshotChangeEventSource eventSource) {
|
||||||
|
this.eventSource = eventSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean arrived(Payload signalPayload) {
|
||||||
|
eventSource.windowClosed(signalPayload.offsetContext);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,484 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.pipeline.source.snapshot.incremental;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectInputStream;
|
||||||
|
import java.io.ObjectOutputStream;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Struct;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import io.debezium.DebeziumException;
|
||||||
|
import io.debezium.annotation.NotThreadSafe;
|
||||||
|
import io.debezium.config.CommonConnectorConfig;
|
||||||
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
|
import io.debezium.pipeline.EventDispatcher;
|
||||||
|
import io.debezium.pipeline.spi.ChangeRecordEmitter;
|
||||||
|
import io.debezium.pipeline.spi.OffsetContext;
|
||||||
|
import io.debezium.relational.Column;
|
||||||
|
import io.debezium.relational.RelationalDatabaseSchema;
|
||||||
|
import io.debezium.relational.RelationalSnapshotChangeEventSource;
|
||||||
|
import io.debezium.relational.SnapshotChangeRecordEmitter;
|
||||||
|
import io.debezium.relational.Table;
|
||||||
|
import io.debezium.relational.TableId;
|
||||||
|
import io.debezium.relational.TableSchema;
|
||||||
|
import io.debezium.schema.DataCollectionId;
|
||||||
|
import io.debezium.schema.DatabaseSchema;
|
||||||
|
import io.debezium.util.Clock;
|
||||||
|
import io.debezium.util.ColumnUtils;
|
||||||
|
import io.debezium.util.HexConverter;
|
||||||
|
import io.debezium.util.Strings;
|
||||||
|
import io.debezium.util.Threads;
|
||||||
|
import io.debezium.util.Threads.Timer;
|
||||||
|
|
||||||
|
@NotThreadSafe
|
||||||
|
public class IncrementalSnapshotChangeEventSource<T extends DataCollectionId> {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalSnapshotChangeEventSource.class);
|
||||||
|
|
||||||
|
public static final String INCREMENTAL_SNAPSHOT_KEY = "incremental_snapshot";
|
||||||
|
public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY = INCREMENTAL_SNAPSHOT_KEY + "_collections";
|
||||||
|
public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key";
|
||||||
|
public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key";
|
||||||
|
|
||||||
|
// TODO Metrics
|
||||||
|
// Need to discuss and decide if
|
||||||
|
// - SnapshotChangeEventSourceMetricsMXBean would be extended with window metrics
|
||||||
|
// - new MXBean would be introduced with subset of SnapshotChangeEventSourceMetricsMXBean and additional window metrics
|
||||||
|
// - SnapshotChangeEventSourceMetricsMXBean would be reused and new MXBean should be introduce for window metrics
|
||||||
|
|
||||||
|
// List needs to be used as key as it implements hashCode/equals contract
|
||||||
|
private Map<Struct, Object[]> window = new LinkedHashMap<>();
|
||||||
|
private CommonConnectorConfig connectorConfig;
|
||||||
|
private JdbcConnection jdbcConnection;
|
||||||
|
// TODO Pass Clock
|
||||||
|
private final Clock clock = Clock.system();
|
||||||
|
private final String signalWindowStatement;
|
||||||
|
private final RelationalDatabaseSchema databaseSchema;
|
||||||
|
private final EventDispatcher<T> dispatcher;
|
||||||
|
|
||||||
|
private boolean windowOpened = false;
|
||||||
|
private Object[] chunkEndPosition;
|
||||||
|
private Table currentTable;
|
||||||
|
|
||||||
|
// TODO Extract into a separate IncrementalSnapshotContext
|
||||||
|
// TODO After extracting add into source info optional block incrementalSnapshotWindow{String from, String to}
|
||||||
|
// State to be stored and recovered from offsets
|
||||||
|
private final Queue<T> dataCollectionsToSnapshot = new LinkedList<>();
|
||||||
|
// The PK of the last record that was passed to Kafka Connect
|
||||||
|
// In case of connector restart the start of the first window will be populated from it
|
||||||
|
private Object[] lastEventSentKey;
|
||||||
|
// The largest PK in the table at the start of snapshot
|
||||||
|
private Object[] maximumKey;
|
||||||
|
|
||||||
|
public IncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection,
|
||||||
|
DatabaseSchema<?> databaseSchema, EventDispatcher<T> dispatcher) {
|
||||||
|
this.connectorConfig = config;
|
||||||
|
this.jdbcConnection = jdbcConnection;
|
||||||
|
signalWindowStatement = "INSERT INTO " + connectorConfig.getSignalingDataCollectionId()
|
||||||
|
+ " VALUES (?, ?, null)";
|
||||||
|
this.databaseSchema = (RelationalDatabaseSchema) databaseSchema;
|
||||||
|
this.dispatcher = dispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void windowOpen() {
|
||||||
|
LOGGER.info("Opening window for incremental snapshot batch");
|
||||||
|
windowOpened = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void windowClosed(OffsetContext offsetContext) {
|
||||||
|
try {
|
||||||
|
LOGGER.info("Closing window for incremental snapshot chunk");
|
||||||
|
windowOpened = false;
|
||||||
|
// TODO There is an issue here
|
||||||
|
// Events are emitted with tx log coordinates of the CloseIncrementalSnapshotWindow
|
||||||
|
// These means that if the connector is restarted in the middle of emptying the buffer
|
||||||
|
// then the rest of the buffer might not be resent or even the snapshotting restarted
|
||||||
|
// as there is no more of events.
|
||||||
|
// Most probably could be solved by injecting a sequence of windowOpen/Closed upon the start
|
||||||
|
offsetContext.incrementalSnapshotWindow();
|
||||||
|
for (Object[] row : window.values()) {
|
||||||
|
sendEnvent(offsetContext, row);
|
||||||
|
}
|
||||||
|
offsetContext.postSnapshotCompletion();
|
||||||
|
window.clear();
|
||||||
|
populateWindow();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendEnvent(OffsetContext offsetContext, Object[] row) throws InterruptedException {
|
||||||
|
lastEventSentKey = keyFromRow(row);
|
||||||
|
offsetContext.event((T) currentDataCollectionId(), clock.currentTimeAsInstant());
|
||||||
|
dispatcher.dispatchSnapshotEvent((T) currentDataCollectionId(),
|
||||||
|
getChangeRecordEmitter(currentDataCollectionId(), offsetContext, row),
|
||||||
|
dispatcher.getIncrementalSnapshotChangeEventReceiver());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link ChangeRecordEmitter} producing the change records for
|
||||||
|
* the given table row.
|
||||||
|
*/
|
||||||
|
protected ChangeRecordEmitter getChangeRecordEmitter(T dataCollectionId, OffsetContext offsetContext,
|
||||||
|
Object[] row) {
|
||||||
|
return new SnapshotChangeRecordEmitter(offsetContext, row, clock);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processMessage(DataCollectionId dataCollectionId, Object key) {
|
||||||
|
LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window);
|
||||||
|
if (!windowOpened || window.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!currentDataCollectionId().equals(dataCollectionId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (key instanceof Struct) {
|
||||||
|
if (window.remove((Struct) key) != null) {
|
||||||
|
LOGGER.info("Removed '{}' from window", key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected T currentDataCollectionId() {
|
||||||
|
return dataCollectionsToSnapshot.peek();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void emitWindowOpen() throws SQLException {
|
||||||
|
jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
|
||||||
|
x.setString(1, UUID.randomUUID().toString());
|
||||||
|
x.setString(2, OpenIncrementalSnapshotWindow.NAME);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void emitWindowClose() throws SQLException {
|
||||||
|
jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
|
||||||
|
x.setString(1, UUID.randomUUID().toString());
|
||||||
|
x.setString(2, CloseIncrementalSnapshotWindow.NAME);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String buildChunkQuery(Table table) {
|
||||||
|
final StringBuilder sql = new StringBuilder("SELECT * FROM ");
|
||||||
|
sql.append(table.id().toString());
|
||||||
|
|
||||||
|
// Add condition when this is not the first query
|
||||||
|
if (isNonInitialChunk()) {
|
||||||
|
// Window boundaries
|
||||||
|
sql.append(" WHERE ");
|
||||||
|
addKeyColumnsToCondition(table, sql, " >= ?");
|
||||||
|
sql.append(" AND NOT (");
|
||||||
|
addKeyColumnsToCondition(table, sql, " = ?");
|
||||||
|
sql.append(")");
|
||||||
|
// Table boundaries
|
||||||
|
sql.append(" AND ");
|
||||||
|
addKeyColumnsToCondition(table, sql, " <= ?");
|
||||||
|
}
|
||||||
|
// TODO limiting is db dialect based
|
||||||
|
sql.append(" ORDER BY ")
|
||||||
|
.append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(", ")))
|
||||||
|
.append(" LIMIT ").append(connectorConfig.getIncrementalSnashotChunkSize());
|
||||||
|
return sql.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNonInitialChunk() {
|
||||||
|
return chunkEndPosition != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String buildMaxPrimaryKeyQuery(Table table) {
|
||||||
|
final StringBuilder sql = new StringBuilder("SELECT * FROM ");
|
||||||
|
sql.append(table.id().toString());
|
||||||
|
|
||||||
|
// TODO limiting is db dialect based
|
||||||
|
sql.append(" ORDER BY ")
|
||||||
|
.append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(" DESC, ")))
|
||||||
|
.append(" DESC LIMIT ").append(1);
|
||||||
|
return sql.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void nextChunk(Object[] end) {
|
||||||
|
chunkEndPosition = end;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetChunk() {
|
||||||
|
chunkEndPosition = null;
|
||||||
|
maximumKey = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean tablesAvailable() {
|
||||||
|
return !dataCollectionsToSnapshot.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setMaximumKey(Object[] key) {
|
||||||
|
maximumKey = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasMaximumKey() {
|
||||||
|
return maximumKey != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void populateWindow() throws InterruptedException {
|
||||||
|
if (!tablesAvailable()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
emitWindowOpen();
|
||||||
|
while (tablesAvailable()) {
|
||||||
|
final TableId currentTableId = (TableId) currentDataCollectionId();
|
||||||
|
currentTable = databaseSchema.tableFor(currentTableId);
|
||||||
|
if (!hasMaximumKey()) {
|
||||||
|
setMaximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> {
|
||||||
|
if (!rs.next()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return keyFromRow(rowToArray(currentTable, rs, ColumnUtils.toArray(rs, currentTable)));
|
||||||
|
}));
|
||||||
|
if (!hasMaximumKey()) {
|
||||||
|
LOGGER.info(
|
||||||
|
"No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty",
|
||||||
|
currentTableId);
|
||||||
|
nextDataCollection();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId,
|
||||||
|
maximumKey);
|
||||||
|
}
|
||||||
|
createDataEventsForTable(dataCollectionsToSnapshot.size());
|
||||||
|
if (window.isEmpty()) {
|
||||||
|
LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
|
||||||
|
currentTableId);
|
||||||
|
nextDataCollection();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
emitWindowClose();
|
||||||
|
}
|
||||||
|
catch (SQLException e) {
|
||||||
|
throw new DebeziumException("Database error while executing incremental snapshot", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected T nextDataCollection() {
|
||||||
|
resetChunk();
|
||||||
|
return dataCollectionsToSnapshot.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addTablesIdsToSnapshot(List<T> dataCollectionIds) {
|
||||||
|
boolean shouldPopulateWindow = false;
|
||||||
|
if (!tablesAvailable()) {
|
||||||
|
shouldPopulateWindow = true;
|
||||||
|
}
|
||||||
|
dataCollectionsToSnapshot.addAll(dataCollectionIds);
|
||||||
|
if (shouldPopulateWindow) {
|
||||||
|
try {
|
||||||
|
populateWindow();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds) {
|
||||||
|
addTablesIdsToSnapshot(dataCollectionIds.stream().map(x -> (T) TableId.parse(x)).collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) {
|
||||||
|
for (Iterator<Column> i = table.primaryKeyColumns().iterator(); i.hasNext();) {
|
||||||
|
final Column key = i.next();
|
||||||
|
sql.append(key.name()).append(predicate);
|
||||||
|
if (i.hasNext()) {
|
||||||
|
sql.append(" AND ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatches the data change events for the records of a single table.
|
||||||
|
*/
|
||||||
|
private void createDataEventsForTable(int tableCount) throws InterruptedException {
|
||||||
|
long exportStart = clock.currentTimeInMillis();
|
||||||
|
LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), tableCount);
|
||||||
|
|
||||||
|
final String selectStatement = buildChunkQuery(currentTable);
|
||||||
|
LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(),
|
||||||
|
selectStatement, chunkEndPosition, maximumKey);
|
||||||
|
|
||||||
|
final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id());
|
||||||
|
|
||||||
|
try (PreparedStatement statement = readTableStatement(selectStatement);
|
||||||
|
ResultSet rs = statement.executeQuery()) {
|
||||||
|
|
||||||
|
final ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, currentTable);
|
||||||
|
long rows = 0;
|
||||||
|
Timer logTimer = getTableScanLogTimer();
|
||||||
|
|
||||||
|
Object[] lastRow = null;
|
||||||
|
while (rs.next()) {
|
||||||
|
rows++;
|
||||||
|
final Object[] row = rowToArray(currentTable, rs, columnArray);
|
||||||
|
final Struct keyStruct = tableSchema.keyFromColumnData(row);
|
||||||
|
window.put(keyStruct, row);
|
||||||
|
if (logTimer.expired()) {
|
||||||
|
long stop = clock.currentTimeInMillis();
|
||||||
|
LOGGER.debug("\t Exported {} records for table '{}' after {}", rows, currentTable.id(),
|
||||||
|
Strings.duration(stop - exportStart));
|
||||||
|
logTimer = getTableScanLogTimer();
|
||||||
|
}
|
||||||
|
lastRow = row;
|
||||||
|
}
|
||||||
|
nextChunk(keyFromRow(lastRow));
|
||||||
|
if (lastRow != null) {
|
||||||
|
LOGGER.debug("\t Next window will resume from '{}'", chunkEndPosition);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
|
||||||
|
currentTable.id(), Strings.duration(clock.currentTimeInMillis() - exportStart));
|
||||||
|
}
|
||||||
|
catch (SQLException e) {
|
||||||
|
throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract to JdbcConnection, same as in RelationalSnapshotChangeEventSource
|
||||||
|
protected Object[] rowToArray(Table table, ResultSet rs, ColumnUtils.ColumnArray columnArray) throws SQLException {
|
||||||
|
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
|
||||||
|
for (int i = 0; i < columnArray.getColumns().length; i++) {
|
||||||
|
row[columnArray.getColumns()[i].position() - 1] = getColumnValue(rs, i + 1, columnArray.getColumns()[i],
|
||||||
|
table);
|
||||||
|
}
|
||||||
|
return row;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Parmetrize the method and extract it to JdbcConnection
|
||||||
|
/**
|
||||||
|
* Allow per-connector query creation to override for best database
|
||||||
|
* performance depending on the table size.
|
||||||
|
*/
|
||||||
|
protected PreparedStatement readTableStatement(String sql) throws SQLException {
|
||||||
|
int fetchSize = connectorConfig.getSnapshotFetchSize();
|
||||||
|
PreparedStatement statement = jdbcConnection.connection().prepareStatement(sql);
|
||||||
|
if (isNonInitialChunk()) {
|
||||||
|
for (int i = 0; i < chunkEndPosition.length; i++) {
|
||||||
|
statement.setObject(i + 1, chunkEndPosition[i]);
|
||||||
|
statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]);
|
||||||
|
statement.setObject(i + 1 + 2 * chunkEndPosition.length, maximumKey[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
statement.setFetchSize(fetchSize);
|
||||||
|
return statement;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Timer getTableScanLogTimer() {
|
||||||
|
return Threads.timer(clock, RelationalSnapshotChangeEventSource.LOG_INTERVAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Extract these two methods from *SnapshotChangeEventSource to JdbcValueConverters or JdbcConnection
|
||||||
|
protected Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
|
||||||
|
return getColumnValue(rs, columnIndex, column);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
protected Object getColumnValue(ResultSet rs, int columnIndex, Column column) throws SQLException {
|
||||||
|
return rs.getObject(columnIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object[] keyFromRow(Object[] row) {
|
||||||
|
if (row == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final List<Column> keyColumns = currentTable.primaryKeyColumns();
|
||||||
|
final Object[] key = new Object[keyColumns.size()];
|
||||||
|
for (int i = 0; i < keyColumns.size(); i++) {
|
||||||
|
key[i] = row[keyColumns.get(i).position() - 1];
|
||||||
|
}
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String arrayToSerializedString(Object[] array) {
|
||||||
|
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
|
||||||
|
oos.writeObject(array);
|
||||||
|
return HexConverter.convertToHexString(bos.toByteArray());
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object[] serializedStringToArray(String field, String serialized) {
|
||||||
|
try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
|
||||||
|
ObjectInputStream ois = new ObjectInputStream(bis)) {
|
||||||
|
return (Object[]) ois.readObject();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new DebeziumException(String.format("Failed to deserialize '%s' with value '%s'", field, serialized), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String dataCollectionsToSnapshotAsString() {
|
||||||
|
// TODO Handle non-standard table ids containing dots, commas etc.
|
||||||
|
return dataCollectionsToSnapshot.stream()
|
||||||
|
.map(x -> x.toString())
|
||||||
|
.collect(Collectors.joining(","));
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> stringToDataCollections(String dataCollectionsStr) {
|
||||||
|
return Arrays.asList(dataCollectionsStr.split(","));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, ?> store(Map<String, ?> iOffset) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Map<String, Object> offset = (Map<String, Object>) iOffset;
|
||||||
|
if (!tablesAvailable()) {
|
||||||
|
return offset;
|
||||||
|
}
|
||||||
|
offset.put(EVENT_PRIMARY_KEY, arrayToSerializedString(lastEventSentKey));
|
||||||
|
offset.put(TABLE_MAXIMUM_KEY, arrayToSerializedString(maximumKey));
|
||||||
|
offset.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY, dataCollectionsToSnapshotAsString());
|
||||||
|
return offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Call on connector start
|
||||||
|
public void load(Map<String, ?> offsets) {
|
||||||
|
if (offsets == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final String lastEventSentKeyStr = (String) offsets.get(EVENT_PRIMARY_KEY);
|
||||||
|
chunkEndPosition = (lastEventSentKeyStr != null) ? serializedStringToArray(EVENT_PRIMARY_KEY, lastEventSentKeyStr) : null;
|
||||||
|
lastEventSentKey = null;
|
||||||
|
final String maximumKeyStr = (String) offsets.get(TABLE_MAXIMUM_KEY);
|
||||||
|
maximumKey = (maximumKeyStr != null) ? serializedStringToArray(TABLE_MAXIMUM_KEY, maximumKeyStr) : null;
|
||||||
|
final String dataCollectionsStr = (String) offsets.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY);
|
||||||
|
dataCollectionsToSnapshot.clear();
|
||||||
|
if (dataCollectionsStr != null) {
|
||||||
|
addDataCollectionNamesToSnapshot(stringToDataCollections(dataCollectionsStr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,32 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.pipeline.source.snapshot.incremental;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import io.debezium.pipeline.signal.Signal;
|
||||||
|
import io.debezium.pipeline.signal.Signal.Payload;
|
||||||
|
|
||||||
|
public class OpenIncrementalSnapshotWindow implements Signal.Action {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(OpenIncrementalSnapshotWindow.class);
|
||||||
|
|
||||||
|
public static final String NAME = "snapshot-window-open";
|
||||||
|
|
||||||
|
private IncrementalSnapshotChangeEventSource eventSource;
|
||||||
|
|
||||||
|
public OpenIncrementalSnapshotWindow(IncrementalSnapshotChangeEventSource eventSource) {
|
||||||
|
this.eventSource = eventSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean arrived(Payload signalPayload) {
|
||||||
|
eventSource.windowOpen();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -78,4 +78,12 @@ interface Loader {
|
|||||||
* @return transaction context
|
* @return transaction context
|
||||||
*/
|
*/
|
||||||
TransactionContext getTransactionContext();
|
TransactionContext getTransactionContext();
|
||||||
|
|
||||||
|
default void incrementalSnapshotWindow() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
default void incrementalSnapshotStop() {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ public abstract class RelationalSnapshotChangeEventSource extends AbstractSnapsh
|
|||||||
/**
|
/**
|
||||||
* Interval for showing a log statement with the progress while scanning a single table.
|
* Interval for showing a log statement with the progress while scanning a single table.
|
||||||
*/
|
*/
|
||||||
private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);
|
public static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);
|
||||||
|
|
||||||
private final RelationalDatabaseConnectorConfig connectorConfig;
|
private final RelationalDatabaseConnectorConfig connectorConfig;
|
||||||
private final OffsetContext previousOffset;
|
private final OffsetContext previousOffset;
|
||||||
|
@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.pipeline.source.snapshot.incremental;
|
||||||
|
|
||||||
|
import org.fest.assertions.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import io.debezium.config.CommonConnectorConfig;
|
||||||
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.connector.SourceInfoStructMaker;
|
||||||
|
import io.debezium.relational.Column;
|
||||||
|
import io.debezium.relational.Table;
|
||||||
|
import io.debezium.relational.TableId;
|
||||||
|
|
||||||
|
public class IncrementalSnapshotChangeEventSourceTest {
|
||||||
|
|
||||||
|
protected CommonConnectorConfig config() {
|
||||||
|
return new CommonConnectorConfig(
|
||||||
|
Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").build(),
|
||||||
|
"core", 0) {
|
||||||
|
@Override
|
||||||
|
protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getContextName() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectorName() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuildQuery() {
|
||||||
|
final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(config(), null, null, null);
|
||||||
|
final Column pk1 = Column.editor().name("pk1").create();
|
||||||
|
final Column pk2 = Column.editor().name("pk2").create();
|
||||||
|
final Column val1 = Column.editor().name("val1").create();
|
||||||
|
final Column val2 = Column.editor().name("val2").create();
|
||||||
|
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2)
|
||||||
|
.addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create();
|
||||||
|
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM s1.table1 ORDER BY pk1, pk2 LIMIT 1024");
|
||||||
|
source.nextChunk(new Object[]{ 1, 5 });
|
||||||
|
source.setMaximumKey(new Object[]{ 10, 50 });
|
||||||
|
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo(
|
||||||
|
"SELECT * FROM s1.table1 WHERE pk1 >= ? AND pk2 >= ? AND NOT (pk1 = ? AND pk2 = ?) AND pk1 <= ? AND pk2 <= ? ORDER BY pk1, pk2 LIMIT 1024");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxQuery() {
|
||||||
|
final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(config(), null, null, null);
|
||||||
|
final Column pk1 = Column.editor().name("pk1").create();
|
||||||
|
final Column pk2 = Column.editor().name("pk2").create();
|
||||||
|
final Column val1 = Column.editor().name("val1").create();
|
||||||
|
final Column val2 = Column.editor().name("val2").create();
|
||||||
|
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2)
|
||||||
|
.addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create();
|
||||||
|
Assertions.assertThat(source.buildMaxPrimaryKeyQuery(table)).isEqualTo("SELECT * FROM s1.table1 ORDER BY pk1 DESC, pk2 DESC LIMIT 1");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user