DBZ-4272 Incrementally snapshot after schema change without NPE

This commit is contained in:
Chris Cranford 2021-11-12 14:30:48 -05:00 committed by Gunnar Morling
parent 687b79787f
commit 3a8d1ff838
3 changed files with 36 additions and 1 deletions

View File

@ -125,4 +125,9 @@ protected String valueFieldName() {
protected String pkFieldName() {
return "PK";
}
@Override
protected String alterTableStatement(String tableName) {
return "ALTER TABLE " + tableName + " ADD col3 INTEGER DEFAULT 0";
}
}

View File

@ -34,7 +34,7 @@ public static MappedColumns toMap(Table table) {
public static ColumnArray toArray(ResultSet resultSet, Table table) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();
Column[] columns = new Column[metaData.getColumnCount()];
Column[] columns = new Column[Math.min(table.columns().size(), metaData.getColumnCount())];
int greatestColumnPosition = 0;
for (int i = 0; i < columns.length; i++) {
columns[i] = table.columnWithName(metaData.getColumnName(i + 1));

View File

@ -28,6 +28,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
@ -53,6 +54,10 @@ public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector>
protected abstract Configuration.Builder config();
protected String alterTableStatement(String tableName) {
return "ALTER TABLE " + tableName + " add col3 int default 0";
}
protected String tableDataCollectionId() {
return tableName();
}
@ -404,6 +409,31 @@ public void snapshotOnlyWithRestart() throws Exception {
}
}
@Test
@FixFor("DBZ-4272")
public void snapshotProceededBySchemaChange() throws Exception {
Testing.Print.enable();
populateTable();
startConnector();
waitForConnectorToStart();
// Initiate a schema change to the table immediately before the adhoc-snapshot
try (JdbcConnection connection = databaseConnection()) {
connection.execute(alterTableStatement(tableName()));
}
// Some connectors, such as PostgreSQL won't be notified of the previous schema change
// until a DML event occurs, but regardless the incremental snapshot should succeed.
sendAdHocSnapshotSignal();
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
@Override
protected int getMaximumEnqueuedRecordCount() {
return ROW_COUNT * 3;