DBZ-4782 Flush SCN values upon schema changes

This commit is contained in:
Chris Cranford 2022-05-11 13:23:33 -04:00 committed by Jiri Pechanec
parent 65e97796e7
commit e1579f8738
2 changed files with 217 additions and 0 deletions

View File

@ -557,10 +557,38 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
return;
}
final Scn commitScn = offsetContext.getCommitScn();
if (commitScn != null && commitScn.compareTo(row.getScn()) >= 0) {
LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN)", row.getRedoSql(), row.getScn(), commitScn);
return;
}
LOGGER.trace("DDL: '{}' {}", row.getRedoSql(), row);
if (row.getTableName() != null) {
counters.ddlCount++;
final TableId tableId = row.getTableId();
final int activeTransactions = getTransactionCache().size();
if (activeTransactions <= 1) {
boolean advanceLowerScnBoundary = true;
if (activeTransactions == 1) {
final String transactionId = getTransactionCache().keySet().iterator().next();
if (!transactionId.equals(row.getTransactionId())) {
// The row's transaction is not the current only active transaction.
// We should not advance the SCN boundaries.
advanceLowerScnBoundary = false;
}
}
if (advanceLowerScnBoundary) {
LOGGER.debug("Schema change advanced offset SCN to {}", row.getScn());
offsetContext.setScn(row.getScn());
}
}
// Should always advance the commit SCN point with schema changes
LOGGER.debug("Schema change advanced offset commit SCN to {}", row.getScn());
offsetContext.setCommitScn(row.getScn());
dispatcher.dispatchSchemaChangeEvent(partition,
tableId,
new OracleSchemaChangeEventEmitter(

View File

@ -1197,6 +1197,195 @@ public void shouldParseSchemaChangeOnTableWithRawDataType() throws Exception {
}
}
@Test
@FixFor("DBZ-4782")
public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestart() throws Exception {
TestHelper.dropTable(connection, "dbz4782");
try {
connection.execute("CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))");
TestHelper.streamTable(connection, "dbz4782");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("ALTER TABLE dbz4782 add data2 varchar2(50)");
// CREATE, ALTER
SourceRecords sourceRecords = consumeRecordsByTopic(2);
List<SourceRecord> records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME);
assertThat(records).hasSize(2);
assertSnapshotSchemaChange(records.get(0));
List<Struct> tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782");
assertStreamingSchemaChange(records.get(1));
tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782");
// Stop the connector
stopConnector();
// Restart connector and verify that we do not re-emit the ALTER table
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Wait for 20 seconds and assert that there are no available records
waitForAvailableRecords(20, TimeUnit.SECONDS);
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4782");
}
}
@Test
@FixFor("DBZ-4782")
public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestartWithFollowupDml() throws Exception {
TestHelper.dropTable(connection, "dbz4782");
try {
createTable("dbz4782", "CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("ALTER TABLE dbz4782 add data2 varchar2(50)");
// CREATE, ALTER
SourceRecords sourceRecords = consumeRecordsByTopic(2);
List<SourceRecord> records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME);
assertThat(records).hasSize(2);
assertSnapshotSchemaChange(records.get(0));
List<Struct> tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782");
assertStreamingSchemaChange(records.get(1));
tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782");
// Stop the connector
stopConnector();
// Restart connector and verify that we do not re-emit the ALTER table
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO dbz4782 values (1, 'data1', 'data2')");
sourceRecords = consumeRecordsByTopic(1);
records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782"));
assertThat(records).hasSize(1);
VerifyRecord.isValidInsert(records.get(0), "ID", 1);
// There should be no other records to consume
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4782");
}
}
@Test
@FixFor("DBZ-4782")
public void shouldNotResendSchemaChangeWithInprogressTransactionOnSecondTable() throws Exception {
TestHelper.dropTable(connection, "dbz4782a");
TestHelper.dropTable(connection, "dbz4782b");
try {
createTable("dbz4782a", "CREATE TABLE dbz4782a (id numeric(9,0) primary key, data varchar2(50))");
createTable("dbz4782b", "CREATE TABLE dbz4782b (id numeric(9,0) primary key, data varchar2(50))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782[A|B]")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Start in-progress transaction for dbz4728b
try (OracleConnection connection2 = TestHelper.testConnection()) {
// Perform in-progress operation on second connection & alter the other table in primary connection
connection2.executeWithoutCommitting("INSERT INTO dbz4782b values (2, 'connection2')");
connection.execute("ALTER TABLE dbz4782a add data2 varchar2(50)");
// CREATEx2, ALTER (INSERT isn't here yet, its in progress)
SourceRecords sourceRecords = consumeRecordsByTopic(3);
List<SourceRecord> records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME);
assertThat(records).hasSize(3);
assertSnapshotSchemaChange(records.get(0));
List<Struct> tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782A");
assertSnapshotSchemaChange(records.get(1));
tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782B");
assertStreamingSchemaChange(records.get(2));
tableChanges = ((Struct) records.get(2).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782A");
// Stop the connector
stopConnector();
// Now commit the in-progress transaction while connector is down
connection2.commit();
// Restart the connector and verify we don't re-emit the ALTER table; however that we do
// capture the in-progress transaction correctly when it is committed.
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO dbz4782a values (1, 'data1', 'data2')");
sourceRecords = consumeRecordsByTopic(2);
sourceRecords.allRecordsInOrder().forEach(System.out::println);
records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782A"));
assertThat(records).hasSize(1);
VerifyRecord.isValidInsert(records.get(0), "ID", 1);
records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782B"));
assertThat(records).hasSize(1);
VerifyRecord.isValidInsert(records.get(0), "ID", 2);
}
// There should be no other records to consume
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4782a");
TestHelper.dropTable(connection, "dbz4782b");
}
}
private static String getTableIdString(String schemaName, String tableName) {
return new TableId(TestHelper.getDatabaseName(), schemaName, tableName).toDoubleQuotedString();
}