Support for column rename

This commit is contained in:
Jiri Pechanec 2018-10-18 13:49:24 +02:00
parent f9e2f3571b
commit a39fe657e9
3 changed files with 100 additions and 17 deletions

View File

@ -16,7 +16,6 @@ public class ChangeTable {
private final Lsn startLsn;
private final int changeTableObjectId;
private Lsn stopLsn;
private ChangeTable nextVersionOfTable;
public ChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
super();
@ -58,14 +57,6 @@ public int getChangeTableObjectId() {
return changeTableObjectId;
}
public ChangeTable getNextVersionOfTable() {
return nextVersionOfTable;
}
public void setNextVersionOfTable(ChangeTable nextVersionOfTable) {
this.nextVersionOfTable = nextVersionOfTable;
}
@Override
public String toString() {
return "ChangeTable [captureInstance=" + captureInstance + ", sourceTableId=" + sourceTableId

View File

@ -106,7 +106,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final ChangeTable[] tables = getCdcTablesToQuery();
tablesSlot.set(tables);
for (ChangeTable table: tables) {
if (table.getStopLsn().isBetween(fromLsn, currentMaxLsn)) {
if (table.getStartLsn().isBetween(fromLsn, currentMaxLsn)) {
LOGGER.info("Schema will be changed for {}", table);
schemaChangeCheckpoints.add(table);
}
@ -152,7 +152,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
LOGGER.trace("Processing change {}", tableSmallestLsn);
if (!schemaChangeCheckpoints.isEmpty()) {
if (tableSmallestLsn.getRowLsn().compareTo(schemaChangeCheckpoints.peek().getStopLsn()) > 0) {
if (tableSmallestLsn.getRowLsn().compareTo(schemaChangeCheckpoints.peek().getStopLsn()) >= 0) {
migrateTable(schemaChangeCheckpoints);
}
}
@ -207,10 +207,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
private void migrateTable(final Queue<ChangeTable> schemaChangeCheckpoints)
throws InterruptedException, SQLException {
final ChangeTable oldTable = schemaChangeCheckpoints.poll();
final ChangeTable newTable = oldTable.getNextVersionOfTable();
LOGGER.info("Migrating schema from {} to {}", oldTable, newTable);
dispatcher.dispatchSchemaChangeEvent(oldTable.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(offsetContext, newTable, connection.getTableSchemaFromTable(newTable)));
final ChangeTable newTable = schemaChangeCheckpoints.poll();
LOGGER.info("Migrating schema to {}", newTable);
dispatcher.dispatchSchemaChangeEvent(newTable.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(offsetContext, newTable, connection.getTableSchemaFromTable(newTable)));
}
private ChangeTable[] processErrorFromChangeTableQuery(SQLException exception, ChangeTable[] currentChangeTables) throws Exception {
@ -246,12 +245,10 @@ private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedExce
if (captures.size() > 1) {
if (captures.get(0).getStartLsn().compareTo(captures.get(1).getStartLsn()) < 0) {
captures.get(0).setStopLsn(captures.get(1).getStartLsn());
captures.get(0).setNextVersionOfTable(captures.get(1));
tables.add(captures.get(1));
}
else {
captures.get(1).setStopLsn(captures.get(0).getStartLsn());
captures.get(1).setNextVersionOfTable(captures.get(0));
changeTable = captures.get(1);
tables.add(captures.get(0));
}

View File

@ -365,4 +365,99 @@ public void removeColumnFromTable() throws Exception {
);
});
}
@Test
public void renameColumn() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START_1 = 10;
final int ID_START_2 = 100;
final int ID_START_3 = 1000;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')"
);
}
SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.INT32_SCHEMA)
.field("colb", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
});
// CDC must be disabled, otherwise rename fails
connection.disableTableCdc("tableb");
// Enable a second capture instance
connection.execute("exec sp_rename 'tableb.colb', 'newcolb';");
connection.enableTableCdc("tableb", "after_change");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a2')"
);
connection.execute(
"INSERT INTO tableb(id,newcolb) VALUES(" + id + ", 'b2')"
);
}
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.INT32_SCHEMA)
.field("newcolb", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
});
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_3 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a3')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b3')"
);
}
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.INT32_SCHEMA)
.field("newcolb", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
});
}
}