DBZ-1017 Record schema evolution into history correctly
This commit is contained in:
parent
33a39fe111
commit
ed71835cd2
@ -48,7 +48,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
|
||||
tables().overwriteTable(table);
|
||||
|
||||
TableChanges tableChanges = null;
|
||||
if (schemaChange.getType() == SchemaChangeEventType.CREATE && schemaChange.isFromSnapshot()) {
|
||||
if (schemaChange.getType() == SchemaChangeEventType.CREATE) {
|
||||
tableChanges = new TableChanges();
|
||||
tableChanges.create(table);
|
||||
}
|
||||
|
@ -393,6 +393,79 @@ public void removeColumnFromTable() throws Exception {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void readHistoryAfterRestart() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 1;
|
||||
final int TABLES = 2;
|
||||
final int ID_START_1 = 10;
|
||||
final int ID_START_2 = 100;
|
||||
final int ID_START_3 = 1000;
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START_1 + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a')"
|
||||
);
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b')"
|
||||
);
|
||||
}
|
||||
|
||||
SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
|
||||
|
||||
// Enable a second capture instance
|
||||
connection.execute("ALTER TABLE dbo.tableb DROP COLUMN colb");
|
||||
TestHelper.enableTableCdc(connection, "tableb", "after_change");
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START_2 + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a2')"
|
||||
);
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ")"
|
||||
);
|
||||
}
|
||||
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
|
||||
|
||||
stopConnector();
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START_3 + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a3')"
|
||||
);
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ")"
|
||||
);
|
||||
}
|
||||
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
|
||||
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
|
||||
assertSchemaMatchesStruct(
|
||||
(Struct)((Struct)record.value()).get("after"),
|
||||
SchemaBuilder.struct()
|
||||
.optional()
|
||||
.name("server1.testDB.dbo.tableb.Value")
|
||||
.field("id", Schema.INT32_SCHEMA)
|
||||
.build()
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void renameColumn() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
|
Loading…
Reference in New Issue
Block a user