DBZ-4196 Support schema changes during incremental snapshot

This commit is contained in:
Kate 2021-10-23 16:29:13 -04:00 committed by Jiri Pechanec
parent 3569cb4602
commit 44073cf7d8
15 changed files with 611 additions and 44 deletions

View File

@ -988,7 +988,8 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
ENABLE_TIME_ADJUSTER,
BINARY_HANDLING_MODE,
ROW_COUNT_FOR_STREAMING_RESULT_SETS,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES)
.events(
INCLUDE_SQL_QUERY,
TABLE_IGNORE_BUILTIN,
@ -1020,6 +1021,11 @@ public boolean supportsOperationFiltering() {
return true;
}
@Override
protected boolean supportsSchemaChangesDuringIncrementalSnapshot() {
return true;
}
private final Configuration config;
private final SnapshotMode snapshotMode;
private final SnapshotLockingMode snapshotLockingMode;

View File

@ -73,7 +73,7 @@
* <p>A window can be opened and closed right away by the same event. This can happen when a high watermark is an empty set, which means there were no binlog events during the chunk select. Chunk will get inserted right after the low watermark, no events will be deduplicated from the chunk</p>
* <br/>
* <b>No updates for included tables</b>
* <p>Its important to receive binlog events for the Backfill to make progress.All binlog events are checked against the low and high watermarks, including the events from the tables that arent included in the connector. This guarantees that the window processing mode gets updated even when none of the tables included in the connector are getting binlog events.</p>
* <p>Its important to receive binlog events for the incremental snapshot to make progress. All binlog events are checked against the low and high watermarks, including the events from the tables that arent included in the connector. This guarantees that the window processing mode gets updated even when none of the tables included in the connector are getting binlog events.</p>
*/
public class MySqlReadOnlyIncrementalSnapshotChangeEventSource<T extends DataCollectionId> extends AbstractIncrementalSnapshotChangeEventSource<T> {

View File

@ -14,11 +14,11 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotWithSchemaChangesSupportTest;
import io.debezium.relational.TableId;
import io.debezium.util.Testing;
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<MySqlConnector> {
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotWithSchemaChangesSupportTest<MySqlConnector> {
protected static final String SERVER_NAME = "is_test";
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "incremental_snapshot-test").withDbHistoryPath(DB_HISTORY_PATH);
@ -49,6 +49,7 @@ protected Configuration.Builder config() {
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
.with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
.with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true)
.with(MySqlConnector.IMPLEMENTATION_PROP, "new");
}
@ -76,4 +77,48 @@ protected String tableName() {
protected String signalTableName() {
return TableId.parse(DATABASE.qualifiedTableName("debezium_signal")).toQuotedString('`');
}
@Override
protected String tableName(String table) {
return TableId.parse(DATABASE.qualifiedTableName(table)).toQuotedString('`');
}
@Override
protected String alterColumnStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s MODIFY COLUMN %s %s", table, column, type);
}
@Override
protected String alterColumnSetNotNullStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s MODIFY COLUMN %s %s NOT NULL", table, column, type);
}
@Override
protected String alterColumnDropNotNullStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s MODIFY COLUMN %s %s NULL", table, column, type);
}
@Override
protected String alterColumnSetDefaultStatement(String table, String column, String type, String defaultValue) {
return String.format("ALTER TABLE %s MODIFY COLUMN %s %s DEFAULT %s", table, column, type, defaultValue);
}
@Override
protected String alterColumnDropDefaultStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s MODIFY COLUMN %s %s", table, column, type);
}
@Override
protected void executeRenameTable(JdbcConnection connection, String newTable) throws SQLException {
connection.setAutoCommit(false);
String query = String.format("RENAME TABLE %s to %s, %s to %s", tableName(), "old_table", newTable, tableName());
logger.info(query);
connection.executeWithoutCommitting(query);
connection.commit();
}
@Override
protected String createTableStatement(String newTable, String copyTable) {
return String.format("CREATE TABLE %s LIKE %s", newTable, copyTable);
}
}

View File

@ -15,6 +15,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
@ -174,6 +175,7 @@ public void inserts4Pks() throws Exception {
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()),
DATABASE.topicForTable("a4"),
null);
for (int i = 0; i < expectedRecordCount; i++) {
@ -195,6 +197,7 @@ public void insertsWithoutPks() throws Exception {
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()),
DATABASE.topicForTable("a42"),
null);
for (int i = 0; i < expectedRecordCount; i++) {

View File

@ -9,6 +9,7 @@
import java.sql.SQLException;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.After;
@ -123,6 +124,7 @@ public void inserts4Pks() throws Exception {
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()),
"test_server.s1.a4",
null);
for (int i = 0; i < expectedRecordCount; i++) {
@ -144,6 +146,7 @@ public void insertsWithoutPks() throws Exception {
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()),
"test_server.s1.a42",
null);
for (int i = 0; i < expectedRecordCount; i++) {

View File

@ -347,7 +347,9 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
SOURCE_TIMESTAMP_MODE,
MAX_TRANSACTIONS_PER_ITERATION,
BINARY_HANDLING_MODE,
INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE)
INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES)
.excluding(
SCHEMA_WHITELIST,
SCHEMA_INCLUDE_LIST,
@ -462,6 +464,11 @@ public boolean supportsOperationFiltering() {
return true;
}
@Override
protected boolean supportsSchemaChangesDuringIncrementalSnapshot() {
return true;
}
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
switch (version) {

View File

@ -6,6 +6,7 @@
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import java.util.Arrays;
import org.junit.After;
import org.junit.Before;
@ -16,10 +17,10 @@
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipTestRule;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotWithSchemaChangesSupportTest;
import io.debezium.util.Testing;
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<SqlServerConnector> {
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotWithSchemaChangesSupportTest<SqlServerConnector> {
private SqlServerConnection connection;
@ -72,15 +73,64 @@ protected String tableName() {
return "testDB.dbo.a";
}
@Override
protected String tableName(String table) {
return "testDB.dbo." + table;
}
@Override
protected String signalTableName() {
return "dbo.debezium_signal";
}
@Override
protected String alterColumnStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s ALTER COLUMN %s %s", table, column, type);
}
@Override
protected String alterColumnSetNotNullStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s ALTER COLUMN %s %s NOT NULL", table, column, type);
}
@Override
protected String alterColumnDropNotNullStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s ALTER COLUMN %s %s NULL", table, column, type);
}
@Override
protected String alterColumnSetDefaultStatement(String table, String column, String type, String defaultValue) {
return String.format("ALTER TABLE %s ADD CONSTRAINT df_%s DEFAULT %s FOR %s", table, column, defaultValue, column);
}
@Override
protected String alterColumnDropDefaultStatement(String table, String column, String type) {
return String.format("ALTER TABLE %s DROP CONSTRAINT df_%s", table, column);
}
@Override
protected void executeRenameTable(JdbcConnection connection, String newTable) throws SQLException {
TestHelper.disableTableCdc(connection, "a");
connection.setAutoCommit(false);
logger.info(String.format("exec sp_rename '%s', '%s'", tableName(), "old_table"));
connection.executeWithoutCommitting(String.format("exec sp_rename '%s', '%s'", tableName(), "old_table"));
logger.info(String.format("exec sp_rename '%s', '%s'", tableName(newTable), "a"));
connection.executeWithoutCommitting(String.format("exec sp_rename '%s', '%s'", tableName(newTable), "a"));
TestHelper.enableTableCdc(connection, "a", "a", Arrays.asList("pk", "aa", "c"));
connection.commit();
}
@Override
protected String createTableStatement(String newTable, String copyTable) {
return String.format("CREATE TABLE %s (pk int primary key, aa int)", newTable);
}
@Override
protected Builder config() {
return TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB.dbo.debezium_signal");
.with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB.dbo.debezium_signal")
.with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250)
.with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true);
}
}

View File

@ -358,7 +358,7 @@ public static void enableTableCdc(SqlServerConnection connection, String tableNa
* the source table columns that are to be included in the change table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public static void enableTableCdc(SqlServerConnection connection, String tableName, String captureName, List<String> captureColumnList) throws SQLException {
public static void enableTableCdc(JdbcConnection connection, String tableName, String captureName, List<String> captureColumnList) throws SQLException {
Objects.requireNonNull(tableName);
Objects.requireNonNull(captureName);
Objects.requireNonNull(captureColumnList);
@ -374,7 +374,7 @@ public static void enableTableCdc(SqlServerConnection connection, String tableNa
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public static void disableTableCdc(SqlServerConnection connection, String name) throws SQLException {
public static void disableTableCdc(JdbcConnection connection, String name) throws SQLException {
Objects.requireNonNull(name);
String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
connection.execute(disableCdcForTableStmt);

View File

@ -341,6 +341,18 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
.withDefault(1024)
.withValidation(Field::isNonNegativeInteger);
public static final Field INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES = Field.create("incremental.snapshot.allow.schema.changes")
.withDisplayName("Allow schema changes during incremental snapshot if supported.")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Detect schema change during an incremental snapshot and re-select a current chunk to avoid locking DDLs. " +
"Note that changes to a primary key are not supported and can cause incorrect results if performed during an incremental snapshot. " +
"Another limitation is that if a schema change affects only columns' default values, " +
"then the change won't be detected until the DDL is processed from the binlog stream. " +
"This doesn't affect the snapshot events' values, but the schema of snapshot events may have outdated defaults.")
.withDefault(Boolean.FALSE);
public static final Field SNAPSHOT_MODE_TABLES = Field.create("snapshot.include.collection.list")
.withDisplayName("Snapshot mode include data collection")
.withType(Type.LIST)
@ -496,6 +508,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
private final Duration retriableRestartWait;
private final int snapshotFetchSize;
private final int incrementalSnapshotChunkSize;
private final boolean incrementalSnapshotAllowSchemaChanges;
private final int snapshotMaxThreads;
private final Integer queryFetchSize;
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
@ -524,6 +537,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS);
this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE);
this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
this.incrementalSnapshotAllowSchemaChanges = config.getBoolean(INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES);
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config);
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
@ -645,6 +659,14 @@ public boolean supportsOperationFiltering() {
return false;
}
protected boolean supportsSchemaChangesDuringIncrementalSnapshot() {
return false;
}
public boolean isIncrementalSnapshotSchemaChangesEnabled() {
return supportsSchemaChangesDuringIncrementalSnapshot() && incrementalSnapshotAllowSchemaChanges;
}
@SuppressWarnings("unchecked")
private List<CustomConverter<SchemaBuilder, ConvertedField>> getCustomConverters() {
final String converterNameList = config.getString(CUSTOM_CONVERTERS);

View File

@ -7,7 +7,10 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -151,6 +154,10 @@ protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key)
protected abstract void emitWindowClose() throws SQLException, InterruptedException;
protected String buildChunkQuery(Table table) {
return buildChunkQuery(table, connectorConfig.getIncrementalSnashotChunkSize());
}
protected String buildChunkQuery(Table table, int limit) {
String condition = null;
// Add condition when this is not the first query
if (context.isNonInitialChunk()) {
@ -166,7 +173,7 @@ protected String buildChunkQuery(Table table) {
.map(Column::name)
.collect(Collectors.joining(", "));
return jdbcConnection.buildSelectWithRowLimits(table.id(),
connectorConfig.getIncrementalSnashotChunkSize(),
limit,
"*",
Optional.ofNullable(condition),
orderBy);
@ -252,18 +259,15 @@ protected void readChunk() throws InterruptedException {
context.startNewChunk();
emitWindowOpen();
while (context.snapshotRunning()) {
if (isTableInvalid()) {
continue;
}
if (connectorConfig.isIncrementalSnapshotSchemaChangesEnabled() && !schemaHistoryIsUpToDate()) {
// Schema has changed since the previous window.
// Closing the current window and repeating schema verification within the following window.
break;
}
final TableId currentTableId = (TableId) context.currentDataCollectionId();
currentTable = databaseSchema.tableFor(currentTableId);
if (currentTable == null) {
LOGGER.warn("Schema not found for table '{}', known tables {}", currentTableId, databaseSchema.tableIds());
nextDataCollection();
continue;
}
if (getKeyMapper().getKeyKolumns(currentTable).isEmpty()) {
LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId);
nextDataCollection();
continue;
}
if (!context.maximumKey().isPresent()) {
context.maximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> {
if (!rs.next()) {
@ -284,14 +288,19 @@ protected void readChunk() throws InterruptedException {
context.maximumKey().orElse(new Object[0]));
}
}
createDataEventsForTable();
if (window.isEmpty()) {
LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
currentTableId);
tableScanCompleted();
nextDataCollection();
if (createDataEventsForTable()) {
if (window.isEmpty()) {
LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
currentTableId);
tableScanCompleted();
nextDataCollection();
}
else {
break;
}
}
else {
context.revertChunk();
break;
}
}
@ -308,6 +317,66 @@ protected void readChunk() throws InterruptedException {
}
}
private boolean isTableInvalid() {
final TableId currentTableId = (TableId) context.currentDataCollectionId();
currentTable = databaseSchema.tableFor(currentTableId);
if (currentTable == null) {
LOGGER.warn("Schema not found for table '{}', known tables {}", currentTableId, databaseSchema.tableIds());
nextDataCollection();
return true;
}
if (getKeyMapper().getKeyKolumns(currentTable).isEmpty()) {
LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId);
nextDataCollection();
return true;
}
return false;
}
/**
* Verifies that in-memory representation of the tables schema is up to date with the table's schema in the database.
* <p>
* Verification is a two step process:
* <ol>
* <li>Save table's schema from the database to the context
* <li>Verify schema hasn't changed in the following window. If schema has changed repeat the process
* </ol>
* Two step process allows to wait for the connector to receive the DDL event in the binlog stream and update the in-memory representation of the tables schema.
* <p>
* Verification is done at the beginning of the incremental snapshot and on every schema change during the snapshotting.
*/
private boolean schemaHistoryIsUpToDate() {
if (context.isSchemaVerificationPassed()) {
return true;
}
verifySchemaUnchanged();
return context.isSchemaVerificationPassed();
}
/**
* Verifies that table's schema in the database has not changed since it was captured in the previous window
*/
private void verifySchemaUnchanged() {
Table tableSchemaInDatabase = readSchema();
if (context.getSchema() != null) {
context.setSchemaVerificationPassed(context.getSchema().equals(tableSchemaInDatabase));
}
context.setSchema(tableSchemaInDatabase);
}
private Table readSchema() {
final String selectStatement = buildChunkQuery(currentTable, 0);
LOGGER.debug("Reading schema for table '{}' using select statement: '{}'", currentTable.id(), selectStatement);
try (PreparedStatement statement = readTableChunkStatement(selectStatement);
ResultSet rs = statement.executeQuery()) {
return getTable(rs);
}
catch (SQLException e) {
throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e);
}
}
private void nextDataCollection() {
context.nextDataCollection();
if (!context.snapshotRunning()) {
@ -341,7 +410,7 @@ protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String p
/**
* Dispatches the data change events for the records of a single table.
*/
private void createDataEventsForTable() {
private boolean createDataEventsForTable() {
long exportStart = clock.currentTimeInMillis();
LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), context.tablesToBeSnapshottedCount());
@ -353,7 +422,9 @@ private void createDataEventsForTable() {
try (PreparedStatement statement = readTableChunkStatement(selectStatement);
ResultSet rs = statement.executeQuery()) {
if (checkSchemaChanges(rs)) {
return false;
}
final ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, currentTable);
long rows = 0;
Timer logTimer = getTableScanLogTimer();
@ -396,6 +467,43 @@ private void createDataEventsForTable() {
catch (SQLException e) {
throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e);
}
return true;
}
private boolean checkSchemaChanges(ResultSet rs) throws SQLException {
if (!connectorConfig.isIncrementalSnapshotSchemaChangesEnabled()) {
return false;
}
Table schema = getTable(rs);
if (!schema.equals(context.getSchema())) {
context.setSchemaVerificationPassed(false);
Table oldSchema = context.getSchema();
context.setSchema(schema);
LOGGER.info("Schema has changed during the incremental snapshot: Old Schema: {} New Schema: {}", oldSchema, schema);
return true;
}
return false;
}
private Table getTable(ResultSet rs) throws SQLException {
final ResultSetMetaData metaData = rs.getMetaData();
List<Column> columns = new ArrayList<>();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
Column column = Column.editor()
.name(metaData.getColumnName(i))
.jdbcType(metaData.getColumnType(i))
.type(metaData.getColumnTypeName(i))
.optional(metaData.isNullable(i) > 0)
.length(metaData.getPrecision(i))
.scale(metaData.getScale(i))
.create();
columns.add(column);
}
Collections.sort(columns);
return Table.editor()
.tableId(currentTable.id())
.addColumns(columns)
.create();
}
private void incrementTableRowsScanned(long rows) {

View File

@ -24,6 +24,7 @@
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.HexConverter;
@ -73,6 +74,10 @@ public class AbstractIncrementalSnapshotContext<T> implements IncrementalSnapsho
*/
private Object[] maximumKey;
private Table schema;
private boolean schemaVerificationPassed;
public AbstractIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
}
@ -211,6 +216,8 @@ private void resetChunk() {
lastEventKeySent = null;
chunkEndPosition = null;
maximumKey = null;
schema = null;
schemaVerificationPassed = false;
}
public void revertChunk() {
@ -244,6 +251,27 @@ public Optional<Object[]> maximumKey() {
return Optional.ofNullable(maximumKey);
}
@Override
public Table getSchema() {
return schema;
}
@Override
public void setSchema(Table schema) {
this.schema = schema;
}
@Override
public boolean isSchemaVerificationPassed() {
return schemaVerificationPassed;
}
@Override
public void setSchemaVerificationPassed(boolean schemaVerificationPassed) {
this.schemaVerificationPassed = schemaVerificationPassed;
LOGGER.info("Incremental snapshot's schema verification passed = {}, schema = {}", schemaVerificationPassed, schema);
}
@Override
public String toString() {
return "IncrementalSnapshotContext [windowOpened=" + windowOpened + ", chunkEndPosition="

View File

@ -9,6 +9,8 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.relational.Table;
public interface IncrementalSnapshotContext<T> {
T currentDataCollectionId();
@ -46,4 +48,12 @@ public interface IncrementalSnapshotContext<T> {
Map<String, Object> store(Map<String, Object> offset);
void revertChunk();
void setSchema(Table schema);
Table getSchema();
boolean isSchemaVerificationPassed();
void setSchemaVerificationPassed(boolean schemaVerificationPassed);
}

View File

@ -165,7 +165,7 @@ public boolean equals(Object obj) {
if (obj instanceof Column) {
Column that = (Column) obj;
return this.name().equalsIgnoreCase(that.name()) &&
this.typeExpression().equalsIgnoreCase(that.typeExpression()) &&
Strings.equalsIgnoreCase(this.typeExpression(), that.typeExpression()) &&
this.typeName().equalsIgnoreCase(that.typeName()) &&
this.jdbcType() == that.jdbcType() &&
Strings.equalsIgnoreCase(this.charsetName(), that.charsetName()) &&

View File

@ -95,30 +95,31 @@ protected void populate4PkTable(JdbcConnection connection, String tableName) thr
}
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, x -> true, null);
return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), x -> true, null);
}
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount,
Predicate<Map.Entry<Integer, Integer>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer)
protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Function<SourceRecord, V> valueConverter,
Predicate<Map.Entry<Integer, V>> dataCompleted,
Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), topicName(), recordConsumer);
return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), valueConverter, topicName(), recordConsumer);
}
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(
int recordCount,
Predicate<Map.Entry<Integer, Integer>> dataCompleted,
Function<Struct, Integer> idCalculator,
String topicName,
Consumer<List<SourceRecord>> recordConsumer)
protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount,
Predicate<Map.Entry<Integer, V>> dataCompleted,
Function<Struct, Integer> idCalculator,
Function<SourceRecord, V> valueConverter,
String topicName,
Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
final Map<Integer, Integer> dbChanges = new HashMap<>();
final Map<Integer, V> dbChanges = new HashMap<>();
int noRecords = 0;
for (;;) {
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> dataRecords = records.recordsForTopic(topicName);
if (records.allRecordsInOrder().isEmpty()) {
noRecords++;
Assertions.assertThat(noRecords).describedAs("Too many no data record results")
Assertions.assertThat(noRecords).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount))
.isLessThanOrEqualTo(MAXIMUM_NO_RECORDS_CONSUMES);
continue;
}
@ -128,7 +129,7 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(
}
dataRecords.forEach(record -> {
final int id = idCalculator.apply((Struct) record.key());
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
final V value = valueConverter.apply(record);
dbChanges.put(id, value);
});
if (recordConsumer != null) {
@ -145,6 +146,23 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(
return dbChanges;
}
protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), x -> true, null);
}
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, Integer>> dataCompleted,
Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), dataCompleted,
recordConsumer);
}
protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, SourceRecord>> dataCompleted,
Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), dataCompleted, recordConsumer);
}
protected String valueFieldName() {
return "aa";
}

View File

@ -0,0 +1,267 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.source.snapshot.incremental;
import static org.apache.kafka.connect.data.Schema.Type.INT32;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
public abstract class AbstractIncrementalSnapshotWithSchemaChangesSupportTest<T extends SourceConnector> extends AbstractIncrementalSnapshotTest<T> {
protected abstract String tableName(String table);
protected abstract String alterColumnStatement(String table, String column, String type);
protected abstract String alterColumnSetNotNullStatement(String table, String column, String type);
protected abstract String alterColumnDropNotNullStatement(String table, String column, String type);
protected abstract String alterColumnSetDefaultStatement(String table, String column, String type, String defaultValue);
protected abstract String alterColumnDropDefaultStatement(String table, String column, String type);
protected abstract void executeRenameTable(JdbcConnection connection, String newTable) throws SQLException;
protected abstract String createTableStatement(String newTable, String copyTable);
@Test
public void schemaChanges() throws Exception {
Print.enable();
populateTable();
startConnector();
sendAdHocSnapshotSignal();
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(true);
connection.execute(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName(), ROW_COUNT + 11, ROW_COUNT + 10));
connection.execute(alterColumnStatement(tableName(), "aa", "VARCHAR(5)"));
connection.execute(String.format("INSERT INTO %s (pk, aa) VALUES (%s, '%s')", tableName(), ROW_COUNT + 10, ROW_COUNT + 9));
for (int i = 0; i < 9 && !Thread.interrupted(); i += 3) {
connection.execute(String.format("ALTER TABLE %s ADD c INT", tableName()));
connection.execute(String.format("INSERT INTO %s (pk, aa, c) VALUES (%s, '%s', %s)", tableName(), i + ROW_COUNT + 1, i + ROW_COUNT, 1));
connection.execute(alterColumnStatement(tableName(), "c", "VARCHAR(5)"));
connection.execute(String.format("INSERT INTO %s (pk, aa, c) VALUES (%s, '%s', '%s')", tableName(), i + ROW_COUNT + 2, i + ROW_COUNT + 1, "1"));
connection.execute(String.format("ALTER TABLE %s DROP COLUMN c", tableName()));
connection.execute(String.format("INSERT INTO %s (pk, aa) VALUES (%s, '%s')", tableName(), i + ROW_COUNT + 3, i + ROW_COUNT + 2));
}
}
final int expectedRecordCount = ROW_COUNT + 11;
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
assertTrue(String.format("missing PK %d", i + 1), dbChanges.containsKey(i + 1));
SourceRecord record = dbChanges.get(i + 1);
final Schema.Type valueType = record.valueSchema().field("after").schema().field(valueFieldName()).schema().type();
if (valueType == INT32) {
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
assertEquals(i, value);
}
else {
String value = ((Struct) record.value()).getStruct("after").getString(valueFieldName());
assertEquals(Integer.toString(i), value);
}
}
}
@Test
public void renameTable() throws Exception {
Print.enable();
populateTable();
final String newTable = "new_table";
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(true);
connection.execute(createTableStatement(tableName(newTable), tableName()));
connection.execute(String.format("INSERT INTO %s SELECT * FROM %s", tableName(newTable), tableName()));
connection.execute(String.format("ALTER TABLE %s ADD c varchar(5)", tableName(newTable)));
}
startConnector();
sendAdHocSnapshotSignal();
final int updatedRowCount = 10;
final AtomicInteger recordCounter = new AtomicInteger();
final AtomicBoolean tableRenamed = new AtomicBoolean();
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(ROW_COUNT, x -> true,
x -> {
if (recordCounter.addAndGet(x.size()) > 10 && !tableRenamed.get()) {
try (JdbcConnection connection = databaseConnection()) {
executeRenameTable(connection, newTable);
connection.executeWithoutCommitting(
String.format("UPDATE %s SET c = 'c' WHERE pk >= %s", tableName(), ROW_COUNT - updatedRowCount));
connection.commit();
}
catch (SQLException e) {
throw new RuntimeException(e);
}
tableRenamed.set(true);
}
});
for (int i = 0; i < ROW_COUNT - updatedRowCount; i++) {
assertTrue(dbChanges.containsKey(i + 1));
SourceRecord record = dbChanges.get(i + 1);
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
assertEquals(i, value);
if (((Struct) record.value()).schema().field("c") != null) {
final String c = ((Struct) record.value()).getStruct("after").getString("c");
assertNull(c);
}
}
for (int i = ROW_COUNT - updatedRowCount; i < ROW_COUNT; i++) {
assertTrue(dbChanges.containsKey(i + 1));
SourceRecord record = dbChanges.get(i + 1);
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
assertEquals(i, value);
final String c = ((Struct) record.value()).getStruct("after").getString("c");
assertEquals("c", c);
}
}
@Test
public void columnNullabilityChanges() throws Exception {
Print.enable();
populateTable();
final Configuration config = config().build();
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignal();
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
connection.execute(alterColumnSetNotNullStatement(tableName(), "aa", "INTEGER"));
connection.commit();
connection.execute(alterColumnDropNotNullStatement(tableName(), "aa", "INTEGER"));
connection.commit();
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, null)",
tableName(),
i + ROW_COUNT + 1));
}
connection.commit();
}
final int expectedRecordCount = ROW_COUNT * 2;
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < ROW_COUNT; i++) {
SourceRecord record = dbChanges.get(i + 1);
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
assertEquals(i, value);
}
for (int i = ROW_COUNT; i < 2 * ROW_COUNT; i++) {
SourceRecord record = dbChanges.get(i + 1);
final Integer value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
assertNull(value);
}
}
@Test
public void columnDefaultChanges() throws Exception {
Print.enable();
populateTable();
final Configuration config = config().build();
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignal();
final int expectedRecordCount = ROW_COUNT * 4;
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
connection.execute(alterColumnSetDefaultStatement(tableName(), "aa", "INTEGER", "-6"));
connection.commit();
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk) VALUES (%s)",
tableName(),
i + ROW_COUNT + 1));
}
connection.commit();
connection.executeWithoutCommitting(alterColumnDropDefaultStatement(tableName(), "aa", "INTEGER"));
connection.executeWithoutCommitting(alterColumnSetDefaultStatement(tableName(), "aa", "INTEGER", "-9"));
connection.commit();
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk) VALUES (%s)",
tableName(),
i + 2 * ROW_COUNT + 1));
}
connection.commit();
connection.executeWithoutCommitting(alterColumnDropDefaultStatement(tableName(), "aa", "INTEGER"));
connection.commit();
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk) VALUES (%s)",
tableName(),
i + 3 * ROW_COUNT + 1));
}
connection.commit();
}
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < ROW_COUNT; i++) {
SourceRecord record = dbChanges.get(i + 1);
final Struct after = ((Struct) record.value()).getStruct("after");
final int value = after.getInt32(valueFieldName());
assertEquals(i, value);
}
for (int i = ROW_COUNT; i < 2 * ROW_COUNT; i++) {
SourceRecord record = dbChanges.get(i + 1);
final Struct after = ((Struct) record.value()).getStruct("after");
final Integer value = after.getInt32(valueFieldName());
assertNotNull("value is null at pk=" + (i + 1), value);
assertEquals(String.format("value is %d at pk = %d, expected -6", value, i + 1), -6, value, 0);
}
for (int i = 2 * ROW_COUNT; i < 3 * ROW_COUNT; i++) {
SourceRecord record = dbChanges.get(i + 1);
final Struct after = ((Struct) record.value()).getStruct("after");
final Integer value = after.getInt32(valueFieldName());
assertNotNull("value is null at pk=" + (i + 1), value);
assertEquals(String.format("value is %d at pk = %d, expected -9", value, i + 1), -9, value, 0);
}
for (int i = 3 * ROW_COUNT; i < 4 * ROW_COUNT; i++) {
SourceRecord record = dbChanges.get(i + 1);
final Struct after = ((Struct) record.value()).getStruct("after");
final Integer value = after.getInt32(valueFieldName());
assertNull("value is not null at pk=" + (i + 1), value);
}
}
}