DBZ-812 Changed schema read from the source table; schema evolution works for changes without new records in the batch

This commit is contained in:
Jiri Pechanec 2018-10-18 10:58:17 +02:00
parent cf74b28ac0
commit 70a93769eb
2 changed files with 31 additions and 12 deletions

View File

@ -99,7 +99,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final Lsn fromLsn = lastProcessedLsn.isAvailable() ? connection.incrementLsn(lastProcessedLsn)
: lastProcessedLsn;
schemaChangeCheckpoints.clear();
while (!schemaChangeCheckpoints.isEmpty()) {
migrateTable(schemaChangeCheckpoints);
}
if (!connection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) {
final ChangeTable[] tables = getCdcTablesToQuery();
tablesSlot.set(tables);
@ -151,10 +153,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
LOGGER.trace("Processing change {}", tableSmallestLsn);
if (!schemaChangeCheckpoints.isEmpty()) {
if (tableSmallestLsn.getRowLsn().compareTo(schemaChangeCheckpoints.peek().getStopLsn()) > 0) {
final ChangeTable oldTable = schemaChangeCheckpoints.poll();
final ChangeTable newTable = oldTable.getNextVersionOfTable();
LOGGER.info("Migrating schema from {} to {}", oldTable, newTable);
dispatcher.dispatchSchemaChangeEvent(oldTable.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(offsetContext, newTable, connection.getTableSchemaFromChangeTable(newTable)));
migrateTable(schemaChangeCheckpoints);
}
}
final TableId tableId = tableSmallestLsn.getChangeTable().getSourceTableId();
@ -206,6 +205,14 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
}
private void migrateTable(final Queue<ChangeTable> schemaChangeCheckpoints)
throws InterruptedException, SQLException {
final ChangeTable oldTable = schemaChangeCheckpoints.poll();
final ChangeTable newTable = oldTable.getNextVersionOfTable();
LOGGER.info("Migrating schema from {} to {}", oldTable, newTable);
dispatcher.dispatchSchemaChangeEvent(oldTable.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(offsetContext, newTable, connection.getTableSchemaFromTable(newTable)));
}
private ChangeTable[] processErrorFromChangeTableQuery(SQLException exception, ChangeTable[] currentChangeTables) throws Exception {
final Matcher m = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(exception.getMessage());
if (m.matches()) {

View File

@ -169,7 +169,16 @@ public void removeTable() throws Exception {
}
@Test
public void addColumnToTable() throws Exception {
public void addColumnToTableEndOfBatch() throws Exception {
addColumnToTable(true);
}
@Test
public void addColumnToTableMiddleOfBatch() throws Exception {
addColumnToTable(false);
}
public void addColumnToTable(boolean pauseAfterCaptureChange) throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START_1 = 10;
@ -208,8 +217,11 @@ public void addColumnToTable() throws Exception {
});
// Enable a second capture instance
connection.execute("ALTER TABLE dbo.tableb ADD newcol INT");
connection.execute("ALTER TABLE dbo.tableb ADD newcol INT NOT NULL DEFAULT 0");
connection.enableTableCdc("tableb", "after_change");
if (pauseAfterCaptureChange) {
Thread.sleep(5_000);
}
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;
@ -223,16 +235,16 @@ public void addColumnToTable() throws Exception {
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
// TODO - Optional flag is lost here as it is not carrie dover to the CDC table
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.OPTIONAL_INT32_SCHEMA)
.field("id", Schema.INT32_SCHEMA)
.field("colb", Schema.OPTIONAL_STRING_SCHEMA)
.field("newcol", Schema.OPTIONAL_INT32_SCHEMA)
.field("newcol", Schema.INT32_SCHEMA)
.build()
);
});
@ -255,9 +267,9 @@ public void addColumnToTable() throws Exception {
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.OPTIONAL_INT32_SCHEMA)
.field("id", Schema.INT32_SCHEMA)
.field("colb", Schema.OPTIONAL_STRING_SCHEMA)
.field("newcol", Schema.OPTIONAL_INT32_SCHEMA)
.field("newcol", Schema.INT32_SCHEMA)
.build()
);
});