DBZ-3009 Fix tests compatibility w/Xstream

This commit is contained in:
Chris Cranford 2021-02-19 05:07:10 -05:00 committed by Gunnar Morling
parent 461b784974
commit 514c513f96

View File

@ -272,7 +272,10 @@ private void shouldApplySchemaAndTableInclusionConfiguration(boolean useLegacyOp
connection.execute("COMMIT"); connection.execute("COMMIT");
} }
SourceRecords records = consumeRecordsByTopic(includeDdlChanges ? 3 : 2); // Xstream binds a single schema to be captured, so changes in debezium2.table2 aren't captured
// LogMiner supports schemas based on schema configurations, so debezium2.table2 is captured,
// however due to includeDdlChanges = false, debezium.table3 isn't performed.
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).hasSize(1); assertThat(testTableRecords).hasSize(1);
@ -286,12 +289,17 @@ private void shouldApplySchemaAndTableInclusionConfiguration(boolean useLegacyOp
assertThat(testTableRecords).isNull(); assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2"); testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2");
assertThat(testTableRecords).hasSize(1); if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.XSTREAM)) {
assertThat(testTableRecords).isNull();
}
else {
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1); assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text2-1"); assertThat(after.get("NAME")).isEqualTo("Text2-1");
}
if (includeDdlChanges) { if (includeDdlChanges) {
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
@ -377,9 +385,11 @@ private void shouldApplySchemaAndTableExclusionsConfiguration(boolean useLegacyO
} }
boolean includeDdlChanges = true; boolean includeDdlChanges = true;
boolean isLogMiner = false;
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
// LogMiner currently does not support DDL changes during streaming phase // LogMiner currently does not support DDL changes during streaming phase
includeDdlChanges = false; includeDdlChanges = false;
isLogMiner = true;
} }
Configuration config = TestHelper.defaultConfig() Configuration config = TestHelper.defaultConfig()
@ -413,7 +423,10 @@ private void shouldApplySchemaAndTableExclusionsConfiguration(boolean useLegacyO
connection.execute("COMMIT"); connection.execute("COMMIT");
} }
SourceRecords records = consumeRecordsByTopic(1); // Xstream binds a single schema to be captured, so changes in debezium2.table2 aren't captured
// LogMiner supports schemas based on schema configurations, so debezium2.table2 is captured,
// however due to includeDdlChanges = false, debezium.table3 isn't performed.
SourceRecords records = consumeRecordsByTopic(!isLogMiner ? 0 : 1);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1"); List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE1");
assertThat(testTableRecords).isNull(); assertThat(testTableRecords).isNull();
@ -422,21 +435,21 @@ private void shouldApplySchemaAndTableExclusionsConfiguration(boolean useLegacyO
assertThat(testTableRecords).isNull(); assertThat(testTableRecords).isNull();
testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2"); testTableRecords = records.recordsForTopic("server1.DEBEZIUM2.TABLE2");
assertThat(testTableRecords).hasSize(1); if (!isLogMiner) {
assertThat(testTableRecords).isNull();
}
else {
assertThat(testTableRecords).hasSize(1);
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1); VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 1);
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(1); assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("NAME")).isEqualTo("Text2-1"); assertThat(after.get("NAME")).isEqualTo("Text2-1");
}
if (includeDdlChanges) { if (includeDdlChanges) {
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3"); testTableRecords = records.recordsForTopic("server1.DEBEZIUM.TABLE3");
assertThat(testTableRecords).hasSize(1); assertThat(testTableRecords).isNull();
VerifyRecord.isValidInsert(testTableRecords.get(0), "ID", 3);
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("NAME")).isEqualTo("Text-3");
} }
} }