DBZ-3473 Extract incremental snapshot context

This commit is contained in:
Jiri Pechanec 2021-04-30 14:26:39 +02:00
parent 07371c085b
commit 37626c0dfe
10 changed files with 301 additions and 196 deletions

View File

@ -21,6 +21,7 @@
import io.debezium.connector.postgresql.connection.Lsn; import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.OffsetState; import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
@ -43,10 +44,12 @@ public class PostgresOffsetContext implements OffsetContext {
private Lsn lastCommitLsn; private Lsn lastCommitLsn;
private Lsn streamingStoppingLsn = null; private Lsn streamingStoppingLsn = null;
private final TransactionContext transactionContext; private final TransactionContext transactionContext;
private final IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, Lsn lastCompletelyProcessedLsn, Lsn lastCommitLsn, Long txId, Instant time, private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, Lsn lastCompletelyProcessedLsn, Lsn lastCommitLsn, Long txId, Instant time,
boolean snapshot, boolean snapshot,
boolean lastSnapshotRecord, TransactionContext transactionContext) { boolean lastSnapshotRecord, TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
sourceInfo = new SourceInfo(connectorConfig); sourceInfo = new SourceInfo(connectorConfig);
@ -63,6 +66,7 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn,
sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE); sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE);
} }
this.transactionContext = transactionContext; this.transactionContext = transactionContext;
this.incrementalSnapshotContext = incrementalSnapshotContext;
} }
@Override @Override
@ -95,7 +99,7 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn,
if (lastCommitLsn != null) { if (lastCommitLsn != null) {
result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong()); result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong());
} }
return sourceInfo.isSnapshot() ? result : transactionContext.store(result); return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result));
} }
@Override @Override
@ -213,7 +217,7 @@ public OffsetContext load(Map<String, ?> offset) {
final boolean snapshot = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); final boolean snapshot = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE);
final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE); final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE);
return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, lastCommitLsn, txId, useconds, snapshot, lastSnapshotRecord, return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, lastCommitLsn, txId, useconds, snapshot, lastSnapshotRecord,
TransactionContext.load(offset)); TransactionContext.load(offset), IncrementalSnapshotContext.load(offset, TableId.class));
} }
} }
@ -245,7 +249,8 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne
clock.currentTimeAsInstant(), clock.currentTimeAsInstant(),
false, false,
false, false,
new TransactionContext()); new TransactionContext(),
new IncrementalSnapshotContext<>());
} }
catch (SQLException e) { catch (SQLException e) {
throw new ConnectException("Database processing error", e); throw new ConnectException("Database processing error", e);
@ -277,7 +282,12 @@ public TransactionContext getTransactionContext() {
} }
@Override @Override
public void incrementalSnapshotWindow() { public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
} }
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;
}
} }

View File

@ -232,7 +232,7 @@ public void changeRecord(DataCollectionSchema schema,
if (neverSkip || !skippedOperations.contains(operation)) { if (neverSkip || !skippedOperations.contains(operation)) {
transactionMonitor.dataEvent(dataCollectionId, offset, key, value); transactionMonitor.dataEvent(dataCollectionId, offset, key, value);
eventListener.onEvent(dataCollectionId, offset, key, value); eventListener.onEvent(dataCollectionId, offset, key, value);
incrementalSnapshotChangeEventSource.processMessage(dataCollectionId, key); incrementalSnapshotChangeEventSource.processMessage(dataCollectionId, key, offset);
streamingReceiver.changeRecord(schema, operation, key, value, offset, headers); streamingReceiver.changeRecord(schema, operation, key, value, offset, headers);
} }
} }
@ -377,7 +377,7 @@ public void changeRecord(DataCollectionSchema dataCollectionSchema,
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id()); String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
SourceRecord record = new SourceRecord(offsetContext.getPartition(), SourceRecord record = new SourceRecord(offsetContext.getPartition(),
incrementalSnapshotChangeEventSource.store(offsetContext.getOffset()), offsetContext.getOffset(),
topicName, null, topicName, null,
keySchema, key, keySchema, key,
dataCollectionSchema.getEnvelopeSchema().schema(), dataCollectionSchema.getEnvelopeSchema().schema(),
@ -477,7 +477,7 @@ public void changeRecord(DataCollectionSchema dataCollectionSchema,
SourceRecord record = new SourceRecord( SourceRecord record = new SourceRecord(
offsetContext.getPartition(), offsetContext.getPartition(),
incrementalSnapshotChangeEventSource.store(offsetContext.getOffset()), offsetContext.getOffset(),
topicName, null, topicName, null,
keySchema, key, keySchema, key,
dataCollectionSchema.getEnvelopeSchema().schema(), value, dataCollectionSchema.getEnvelopeSchema().schema(), value,

View File

@ -35,9 +35,9 @@ public enum SnapshotType {
INCREMENTAL INCREMENTAL
} }
private final IncrementalSnapshotChangeEventSource eventSource; private final IncrementalSnapshotChangeEventSource<?> eventSource;
public ExecuteSnapshot(IncrementalSnapshotChangeEventSource eventSource) { public ExecuteSnapshot(IncrementalSnapshotChangeEventSource<?> eventSource) {
this.eventSource = eventSource; this.eventSource = eventSource;
} }
@ -60,7 +60,7 @@ public boolean arrived(Payload signalPayload) {
LOGGER.info("Requested '{}' snapshot of data collections '{}'", type, dataCollections); LOGGER.info("Requested '{}' snapshot of data collections '{}'", type, dataCollections);
switch (type) { switch (type) {
case INCREMENTAL: case INCREMENTAL:
eventSource.addDataCollectionNamesToSnapshot(dataCollections); eventSource.addDataCollectionNamesToSnapshot(dataCollections, signalPayload.offsetContext);
break; break;
} }
return true; return true;

View File

@ -107,7 +107,7 @@ public Signal(CommonConnectorConfig connectorConfig, EventDispatcher<? extends D
} }
final IncrementalSnapshotChangeEventSource eventSource = eventDispatcher != null ? eventDispatcher.getIncrementalSnapshotChangeEventSource() : null; final IncrementalSnapshotChangeEventSource eventSource = eventDispatcher != null ? eventDispatcher.getIncrementalSnapshotChangeEventSource() : null;
registerSignalAction(ExecuteSnapshot.NAME, new ExecuteSnapshot(eventSource)); registerSignalAction(ExecuteSnapshot.NAME, new ExecuteSnapshot(eventSource));
registerSignalAction(OpenIncrementalSnapshotWindow.NAME, new OpenIncrementalSnapshotWindow(eventSource)); registerSignalAction(OpenIncrementalSnapshotWindow.NAME, new OpenIncrementalSnapshotWindow());
registerSignalAction(CloseIncrementalSnapshotWindow.NAME, new CloseIncrementalSnapshotWindow(eventSource)); registerSignalAction(CloseIncrementalSnapshotWindow.NAME, new CloseIncrementalSnapshotWindow(eventSource));
} }

View File

@ -17,15 +17,15 @@ public class CloseIncrementalSnapshotWindow implements Signal.Action {
public static final String NAME = "snapshot-window-close"; public static final String NAME = "snapshot-window-close";
private IncrementalSnapshotChangeEventSource eventSource; private IncrementalSnapshotChangeEventSource<?> eventSource;
public CloseIncrementalSnapshotWindow(IncrementalSnapshotChangeEventSource eventSource) { public CloseIncrementalSnapshotWindow(IncrementalSnapshotChangeEventSource<?> eventSource) {
this.eventSource = eventSource; this.eventSource = eventSource;
} }
@Override @Override
public boolean arrived(Payload signalPayload) { public boolean arrived(Payload signalPayload) {
eventSource.windowClosed(signalPayload.offsetContext); eventSource.closeWindow(signalPayload.offsetContext);
return true; return true;
} }

View File

@ -5,21 +5,13 @@
*/ */
package io.debezium.pipeline.source.snapshot.incremental; package io.debezium.pipeline.source.snapshot.incremental;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -45,7 +37,6 @@
import io.debezium.schema.DatabaseSchema; import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils; import io.debezium.util.ColumnUtils;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings; import io.debezium.util.Strings;
import io.debezium.util.Threads; import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer; import io.debezium.util.Threads.Timer;
@ -55,11 +46,6 @@ public class IncrementalSnapshotChangeEventSource<T extends DataCollectionId> {
private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalSnapshotChangeEventSource.class); private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalSnapshotChangeEventSource.class);
public static final String INCREMENTAL_SNAPSHOT_KEY = "incremental_snapshot";
public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY = INCREMENTAL_SNAPSHOT_KEY + "_collections";
public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key";
public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key";
// TODO Metrics // TODO Metrics
// Need to discuss and decide if // Need to discuss and decide if
// - SnapshotChangeEventSourceMetricsMXBean would be extended with window metrics // - SnapshotChangeEventSourceMetricsMXBean would be extended with window metrics
@ -76,19 +62,9 @@ public class IncrementalSnapshotChangeEventSource<T extends DataCollectionId> {
private final RelationalDatabaseSchema databaseSchema; private final RelationalDatabaseSchema databaseSchema;
private final EventDispatcher<T> dispatcher; private final EventDispatcher<T> dispatcher;
private boolean windowOpened = false;
private Object[] chunkEndPosition;
private Table currentTable; private Table currentTable;
// TODO Extract into a separate IncrementalSnapshotContext private IncrementalSnapshotContext<T> context = null;
// TODO After extracting add into source info optional block incrementalSnapshotWindow{String from, String to}
// State to be stored and recovered from offsets
private final Queue<T> dataCollectionsToSnapshot = new LinkedList<>();
// The PK of the last record that was passed to Kafka Connect
// In case of connector restart the start of the first window will be populated from it
private Object[] lastEventSentKey;
// The largest PK in the table at the start of snapshot
private Object[] maximumKey;
public IncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection, public IncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection,
DatabaseSchema<?> databaseSchema, EventDispatcher<T> dispatcher) { DatabaseSchema<?> databaseSchema, EventDispatcher<T> dispatcher) {
@ -100,28 +76,24 @@ public IncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcCo
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
} }
public void windowOpen() { @SuppressWarnings("unchecked")
LOGGER.info("Opening window for incremental snapshot batch"); public void closeWindow(OffsetContext offsetContext) {
windowOpened = true; context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
}
public void windowClosed(OffsetContext offsetContext) {
try { try {
LOGGER.info("Closing window for incremental snapshot chunk"); context.closeWindow();
windowOpened = false;
// TODO There is an issue here // TODO There is an issue here
// Events are emitted with tx log coordinates of the CloseIncrementalSnapshotWindow // Events are emitted with tx log coordinates of the CloseIncrementalSnapshotWindow
// These means that if the connector is restarted in the middle of emptying the buffer // These means that if the connector is restarted in the middle of emptying the buffer
// then the rest of the buffer might not be resent or even the snapshotting restarted // then the rest of the buffer might not be resent or even the snapshotting restarted
// as there is no more of events. // as there is no more of events.
// Most probably could be solved by injecting a sequence of windowOpen/Closed upon the start // Most probably could be solved by injecting a sequence of windowOpen/Closed upon the start
offsetContext.incrementalSnapshotWindow(); offsetContext.incrementalSnapshotEvents();
for (Object[] row : window.values()) { for (Object[] row : window.values()) {
sendEnvent(offsetContext, row); sendEvent(offsetContext, row);
} }
offsetContext.postSnapshotCompletion(); offsetContext.postSnapshotCompletion();
window.clear(); window.clear();
populateWindow(); readChunk();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
@ -129,11 +101,11 @@ public void windowClosed(OffsetContext offsetContext) {
} }
} }
protected void sendEnvent(OffsetContext offsetContext, Object[] row) throws InterruptedException { protected void sendEvent(OffsetContext offsetContext, Object[] row) throws InterruptedException {
lastEventSentKey = keyFromRow(row); context.sendEvent(keyFromRow(row));
offsetContext.event((T) currentDataCollectionId(), clock.currentTimeAsInstant()); offsetContext.event((T) context.currentDataCollectionId(), clock.currentTimeAsInstant());
dispatcher.dispatchSnapshotEvent((T) currentDataCollectionId(), dispatcher.dispatchSnapshotEvent((T) context.currentDataCollectionId(),
getChangeRecordEmitter(currentDataCollectionId(), offsetContext, row), getChangeRecordEmitter(context.currentDataCollectionId(), offsetContext, row),
dispatcher.getIncrementalSnapshotChangeEventReceiver()); dispatcher.getIncrementalSnapshotChangeEventReceiver());
} }
@ -146,12 +118,14 @@ protected ChangeRecordEmitter getChangeRecordEmitter(T dataCollectionId, OffsetC
return new SnapshotChangeRecordEmitter(offsetContext, row, clock); return new SnapshotChangeRecordEmitter(offsetContext, row, clock);
} }
public void processMessage(DataCollectionId dataCollectionId, Object key) { @SuppressWarnings("unchecked")
public void processMessage(DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window); LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window);
if (!windowOpened || window.isEmpty()) { if (!context.deduplicationNeeded() || window.isEmpty()) {
return; return;
} }
if (!currentDataCollectionId().equals(dataCollectionId)) { if (!context.currentDataCollectionId().equals(dataCollectionId)) {
return; return;
} }
if (key instanceof Struct) { if (key instanceof Struct) {
@ -161,10 +135,6 @@ public void processMessage(DataCollectionId dataCollectionId, Object key) {
} }
} }
protected T currentDataCollectionId() {
return dataCollectionsToSnapshot.peek();
}
private void emitWindowOpen() throws SQLException { private void emitWindowOpen() throws SQLException {
jdbcConnection.prepareUpdate(signalWindowStatement, x -> { jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
x.setString(1, UUID.randomUUID().toString()); x.setString(1, UUID.randomUUID().toString());
@ -186,7 +156,7 @@ protected String buildChunkQuery(Table table) {
sql.append(table.id().toString()); sql.append(table.id().toString());
// Add condition when this is not the first query // Add condition when this is not the first query
if (isNonInitialChunk()) { if (context.isNonInitialChunk()) {
// Window boundaries // Window boundaries
sql.append(" WHERE "); sql.append(" WHERE ");
addKeyColumnsToCondition(table, sql, " >= ?"); addKeyColumnsToCondition(table, sql, " >= ?");
@ -204,10 +174,6 @@ protected String buildChunkQuery(Table table) {
return sql.toString(); return sql.toString();
} }
private boolean isNonInitialChunk() {
return chunkEndPosition != null;
}
protected String buildMaxPrimaryKeyQuery(Table table) { protected String buildMaxPrimaryKeyQuery(Table table) {
final StringBuilder sql = new StringBuilder("SELECT * FROM "); final StringBuilder sql = new StringBuilder("SELECT * FROM ");
sql.append(table.id().toString()); sql.append(table.id().toString());
@ -219,58 +185,37 @@ protected String buildMaxPrimaryKeyQuery(Table table) {
return sql.toString(); return sql.toString();
} }
public void nextChunk(Object[] end) { private void readChunk() throws InterruptedException {
chunkEndPosition = end; if (!context.snapshotRunning()) {
}
private void resetChunk() {
chunkEndPosition = null;
maximumKey = null;
}
protected boolean tablesAvailable() {
return !dataCollectionsToSnapshot.isEmpty();
}
protected void setMaximumKey(Object[] key) {
maximumKey = key;
}
private boolean hasMaximumKey() {
return maximumKey != null;
}
private void populateWindow() throws InterruptedException {
if (!tablesAvailable()) {
return; return;
} }
try { try {
emitWindowOpen(); emitWindowOpen();
while (tablesAvailable()) { while (context.snapshotRunning()) {
final TableId currentTableId = (TableId) currentDataCollectionId(); final TableId currentTableId = (TableId) context.currentDataCollectionId();
currentTable = databaseSchema.tableFor(currentTableId); currentTable = databaseSchema.tableFor(currentTableId);
if (!hasMaximumKey()) { if (!context.maximumKey().isPresent()) {
setMaximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> { context.maximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> {
if (!rs.next()) { if (!rs.next()) {
return null; return null;
} }
return keyFromRow(rowToArray(currentTable, rs, ColumnUtils.toArray(rs, currentTable))); return keyFromRow(rowToArray(currentTable, rs, ColumnUtils.toArray(rs, currentTable)));
})); }));
if (!hasMaximumKey()) { if (!context.maximumKey().isPresent()) {
LOGGER.info( LOGGER.info(
"No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty", "No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty",
currentTableId); currentTableId);
nextDataCollection(); context.nextDataCollection();
continue; continue;
} }
LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId, LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId,
maximumKey); context.maximumKey());
} }
createDataEventsForTable(dataCollectionsToSnapshot.size()); createDataEventsForTable();
if (window.isEmpty()) { if (window.isEmpty()) {
LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished", LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
currentTableId); currentTableId);
nextDataCollection(); context.nextDataCollection();
} }
else { else {
break; break;
@ -283,20 +228,17 @@ private void populateWindow() throws InterruptedException {
} }
} }
protected T nextDataCollection() { @SuppressWarnings("unchecked")
resetChunk(); public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, OffsetContext offsetContext) {
return dataCollectionsToSnapshot.poll(); context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
} boolean shouldReadChunk = false;
if (!context.snapshotRunning()) {
private void addTablesIdsToSnapshot(List<T> dataCollectionIds) { shouldReadChunk = true;
boolean shouldPopulateWindow = false;
if (!tablesAvailable()) {
shouldPopulateWindow = true;
} }
dataCollectionsToSnapshot.addAll(dataCollectionIds); context.addDataCollectionNamesToSnapshot(dataCollectionIds);
if (shouldPopulateWindow) { if (shouldReadChunk) {
try { try {
populateWindow(); readChunk();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
@ -305,11 +247,6 @@ private void addTablesIdsToSnapshot(List<T> dataCollectionIds) {
} }
} }
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds) {
addTablesIdsToSnapshot(dataCollectionIds.stream().map(x -> (T) TableId.parse(x)).collect(Collectors.toList()));
}
protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) { protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) {
for (Iterator<Column> i = table.primaryKeyColumns().iterator(); i.hasNext();) { for (Iterator<Column> i = table.primaryKeyColumns().iterator(); i.hasNext();) {
final Column key = i.next(); final Column key = i.next();
@ -323,13 +260,13 @@ protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String p
/** /**
* Dispatches the data change events for the records of a single table. * Dispatches the data change events for the records of a single table.
*/ */
private void createDataEventsForTable(int tableCount) throws InterruptedException { private void createDataEventsForTable() throws InterruptedException {
long exportStart = clock.currentTimeInMillis(); long exportStart = clock.currentTimeInMillis();
LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), tableCount); LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), context.tablesToBeSnapshottedCount());
final String selectStatement = buildChunkQuery(currentTable); final String selectStatement = buildChunkQuery(currentTable);
LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(), LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(),
selectStatement, chunkEndPosition, maximumKey); selectStatement, context.chunkEndPosititon(), context.maximumKey().get());
final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id()); final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id());
@ -354,9 +291,9 @@ private void createDataEventsForTable(int tableCount) throws InterruptedExceptio
} }
lastRow = row; lastRow = row;
} }
nextChunk(keyFromRow(lastRow)); context.nextChunk(keyFromRow(lastRow));
if (lastRow != null) { if (lastRow != null) {
LOGGER.debug("\t Next window will resume from '{}'", chunkEndPosition); LOGGER.debug("\t Next window will resume from '{}'", context.chunkEndPosititon());
} }
LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows, LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
@ -377,7 +314,7 @@ protected Object[] rowToArray(Table table, ResultSet rs, ColumnUtils.ColumnArray
return row; return row;
} }
// TODO Parmetrize the method and extract it to JdbcConnection // TODO Parameterize the method and extract it to JdbcConnection
/** /**
* Allow per-connector query creation to override for best database * Allow per-connector query creation to override for best database
* performance depending on the table size. * performance depending on the table size.
@ -385,7 +322,9 @@ protected Object[] rowToArray(Table table, ResultSet rs, ColumnUtils.ColumnArray
protected PreparedStatement readTableStatement(String sql) throws SQLException { protected PreparedStatement readTableStatement(String sql) throws SQLException {
int fetchSize = connectorConfig.getSnapshotFetchSize(); int fetchSize = connectorConfig.getSnapshotFetchSize();
PreparedStatement statement = jdbcConnection.connection().prepareStatement(sql); PreparedStatement statement = jdbcConnection.connection().prepareStatement(sql);
if (isNonInitialChunk()) { if (context.isNonInitialChunk()) {
final Object[] maximumKey = context.maximumKey().get();
final Object[] chunkEndPosition = context.chunkEndPosititon();
for (int i = 0; i < chunkEndPosition.length; i++) { for (int i = 0; i < chunkEndPosition.length; i++) {
statement.setObject(i + 1, chunkEndPosition[i]); statement.setObject(i + 1, chunkEndPosition[i]);
statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]); statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]);
@ -422,65 +361,7 @@ private Object[] keyFromRow(Object[] row) {
return key; return key;
} }
private String arrayToSerializedString(Object[] array) { protected void setContext(IncrementalSnapshotContext<T> context) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(); this.context = context;
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(array);
return HexConverter.convertToHexString(bos.toByteArray());
}
catch (IOException e) {
e.printStackTrace();
}
return null;
}
private Object[] serializedStringToArray(String field, String serialized) {
try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (Object[]) ois.readObject();
}
catch (Exception e) {
throw new DebeziumException(String.format("Failed to deserialize '%s' with value '%s'", field, serialized), e);
}
}
private String dataCollectionsToSnapshotAsString() {
// TODO Handle non-standard table ids containing dots, commas etc.
return dataCollectionsToSnapshot.stream()
.map(x -> x.toString())
.collect(Collectors.joining(","));
}
private List<String> stringToDataCollections(String dataCollectionsStr) {
return Arrays.asList(dataCollectionsStr.split(","));
}
public Map<String, ?> store(Map<String, ?> iOffset) {
@SuppressWarnings("unchecked")
final Map<String, Object> offset = (Map<String, Object>) iOffset;
if (!tablesAvailable()) {
return offset;
}
offset.put(EVENT_PRIMARY_KEY, arrayToSerializedString(lastEventSentKey));
offset.put(TABLE_MAXIMUM_KEY, arrayToSerializedString(maximumKey));
offset.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY, dataCollectionsToSnapshotAsString());
return offset;
}
// TODO Call on connector start
public void load(Map<String, ?> offsets) {
if (offsets == null) {
return;
}
final String lastEventSentKeyStr = (String) offsets.get(EVENT_PRIMARY_KEY);
chunkEndPosition = (lastEventSentKeyStr != null) ? serializedStringToArray(EVENT_PRIMARY_KEY, lastEventSentKeyStr) : null;
lastEventSentKey = null;
final String maximumKeyStr = (String) offsets.get(TABLE_MAXIMUM_KEY);
maximumKey = (maximumKeyStr != null) ? serializedStringToArray(TABLE_MAXIMUM_KEY, maximumKeyStr) : null;
final String dataCollectionsStr = (String) offsets.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY);
dataCollectionsToSnapshot.clear();
if (dataCollectionsStr != null) {
addDataCollectionNamesToSnapshot(stringToDataCollections(dataCollectionsStr));
}
} }
} }

View File

@ -0,0 +1,208 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.source.snapshot.incremental;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.relational.TableId;
import io.debezium.util.HexConverter;
/**
* A class describing current state of incremental snapshot
*
* @author Jiri Pechanec
*
*/
@NotThreadSafe
public class IncrementalSnapshotContext<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalSnapshotContext.class);
public static final String INCREMENTAL_SNAPSHOT_KEY = "incremental_snapshot";
public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY = INCREMENTAL_SNAPSHOT_KEY + "_collections";
public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key";
public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key";
/**
* @code(true) if window is opened and deduplication should be executed
*/
private boolean windowOpened = false;
/**
* The last primary key in chunk that is now in process.
*/
private Object[] chunkEndPosition;
// TODO After extracting add into source info optional block
// incrementalSnapshotWindow{String from, String to}
// State to be stored and recovered from offsets
private final Queue<T> dataCollectionsToSnapshot = new LinkedList<>();
/**
* The PK of the last record that was passed to Kafka Connect. In case of
* connector restart the start of the first chunk will be populated from it.
*/
private Object[] lastEventKeySent;
/**
* The largest PK in the table at the start of snapshot.
*/
private Object[] maximumKey;
public void openWindow() {
LOGGER.debug("Opening window for incremental snapshot chunk");
windowOpened = true;
}
public void closeWindow() {
LOGGER.debug("Closing window for incremental snapshot chunk");
windowOpened = false;
}
public boolean deduplicationNeeded() {
return windowOpened;
}
private String arrayToSerializedString(Object[] array) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(array);
return HexConverter.convertToHexString(bos.toByteArray());
}
catch (IOException e) {
e.printStackTrace();
}
return null;
}
private Object[] serializedStringToArray(String field, String serialized) {
try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (Object[]) ois.readObject();
}
catch (Exception e) {
throw new DebeziumException(String.format("Failed to deserialize '%s' with value '%s'", field, serialized),
e);
}
}
private String dataCollectionsToSnapshotAsString() {
// TODO Handle non-standard table ids containing dots, commas etc.
return dataCollectionsToSnapshot.stream().map(x -> x.toString()).collect(Collectors.joining(","));
}
private List<String> stringToDataCollections(String dataCollectionsStr) {
return Arrays.asList(dataCollectionsStr.split(","));
}
protected boolean snapshotRunning() {
return !dataCollectionsToSnapshot.isEmpty();
}
public Map<String, Object> store(Map<String, Object> offset) {
if (!snapshotRunning()) {
return offset;
}
offset.put(EVENT_PRIMARY_KEY, arrayToSerializedString(lastEventKeySent));
offset.put(TABLE_MAXIMUM_KEY, arrayToSerializedString(maximumKey));
offset.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY, dataCollectionsToSnapshotAsString());
return offset;
}
private void addTablesIdsToSnapshot(List<T> dataCollectionIds) {
dataCollectionsToSnapshot.addAll(dataCollectionIds);
}
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds) {
addTablesIdsToSnapshot(dataCollectionIds.stream().map(x -> (T) TableId.parse(x)).collect(Collectors.toList()));
}
public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets, Class<U> clazz) {
final IncrementalSnapshotContext<U> context = new IncrementalSnapshotContext<>();
final String lastEventSentKeyStr = (String) offsets.get(EVENT_PRIMARY_KEY);
context.chunkEndPosition = (lastEventSentKeyStr != null)
? context.serializedStringToArray(EVENT_PRIMARY_KEY, lastEventSentKeyStr)
: null;
context.lastEventKeySent = null;
final String maximumKeyStr = (String) offsets.get(TABLE_MAXIMUM_KEY);
context.maximumKey = (maximumKeyStr != null) ? context.serializedStringToArray(TABLE_MAXIMUM_KEY, maximumKeyStr)
: null;
final String dataCollectionsStr = (String) offsets.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY);
context.dataCollectionsToSnapshot.clear();
if (dataCollectionsStr != null) {
context.addDataCollectionNamesToSnapshot(context.stringToDataCollections(dataCollectionsStr));
}
return context;
}
public void sendEvent(Object[] key) {
lastEventKeySent = key;
}
public T currentDataCollectionId() {
return dataCollectionsToSnapshot.peek();
}
public int tablesToBeSnapshottedCount() {
return dataCollectionsToSnapshot.size();
}
public void nextChunk(Object[] end) {
chunkEndPosition = end;
}
public Object[] chunkEndPosititon() {
return chunkEndPosition;
}
private void resetChunk() {
chunkEndPosition = null;
maximumKey = null;
}
public boolean isNonInitialChunk() {
return chunkEndPosition != null;
}
public T nextDataCollection() {
resetChunk();
return dataCollectionsToSnapshot.poll();
}
public void maximumKey(Object[] key) {
maximumKey = key;
}
public Optional<Object[]> maximumKey() {
return Optional.ofNullable(maximumKey);
}
@Override
public String toString() {
return "IncrementalSnapshotContext [windowOpened=" + windowOpened + ", chunkEndPosition="
+ Arrays.toString(chunkEndPosition) + ", dataCollectionsToSnapshot=" + dataCollectionsToSnapshot
+ ", lastEventKeySent=" + Arrays.toString(lastEventKeySent) + ", maximumKey="
+ Arrays.toString(maximumKey) + "]";
}
}

View File

@ -17,15 +17,12 @@ public class OpenIncrementalSnapshotWindow implements Signal.Action {
public static final String NAME = "snapshot-window-open"; public static final String NAME = "snapshot-window-open";
private IncrementalSnapshotChangeEventSource eventSource; public OpenIncrementalSnapshotWindow() {
public OpenIncrementalSnapshotWindow(IncrementalSnapshotChangeEventSource eventSource) {
this.eventSource = eventSource;
} }
@Override @Override
public boolean arrived(Payload signalPayload) { public boolean arrived(Payload signalPayload) {
eventSource.windowOpen(); signalPayload.offsetContext.getIncrementalSnapshotContext().openWindow();
return true; return true;
} }

View File

@ -11,6 +11,8 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
@ -79,11 +81,16 @@ interface Loader {
*/ */
TransactionContext getTransactionContext(); TransactionContext getTransactionContext();
default void incrementalSnapshotWindow() { default void incrementalSnapshotEvents() {
} }
default void incrementalSnapshotStop() { /**
* Provide a context used by {@link IncrementalSnapshotChangeEventSource} so persist its internal state into offsets to survive
} * between restarts.
*
* @return incremental snapshot context
*/
default IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return null;
};
} }

View File

@ -41,6 +41,8 @@ public String getConnectorName() {
@Test @Test
public void testBuildQuery() { public void testBuildQuery() {
final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(config(), null, null, null); final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(config(), null, null, null);
final IncrementalSnapshotContext<TableId> context = new IncrementalSnapshotContext<>();
source.setContext(context);
final Column pk1 = Column.editor().name("pk1").create(); final Column pk1 = Column.editor().name("pk1").create();
final Column pk2 = Column.editor().name("pk2").create(); final Column pk2 = Column.editor().name("pk2").create();
final Column val1 = Column.editor().name("val1").create(); final Column val1 = Column.editor().name("val1").create();
@ -48,8 +50,8 @@ public void testBuildQuery() {
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2) final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2)
.addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create(); .addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create();
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM s1.table1 ORDER BY pk1, pk2 LIMIT 1024"); Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM s1.table1 ORDER BY pk1, pk2 LIMIT 1024");
source.nextChunk(new Object[]{ 1, 5 }); context.nextChunk(new Object[]{ 1, 5 });
source.setMaximumKey(new Object[]{ 10, 50 }); context.maximumKey(new Object[]{ 10, 50 });
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo( Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo(
"SELECT * FROM s1.table1 WHERE pk1 >= ? AND pk2 >= ? AND NOT (pk1 = ? AND pk2 = ?) AND pk1 <= ? AND pk2 <= ? ORDER BY pk1, pk2 LIMIT 1024"); "SELECT * FROM s1.table1 WHERE pk1 >= ? AND pk2 >= ? AND NOT (pk1 = ? AND pk2 = ?) AND pk1 <= ? AND pk2 <= ? ORDER BY pk1, pk2 LIMIT 1024");
} }