DBZ-3401 Add existing schema history tests

This commit is contained in:
Chris Cranford 2024-02-17 20:06:08 -05:00 committed by Chris Cranford
parent 0a0e499cdc
commit 17faf65802

View File

@ -11,15 +11,24 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -27,9 +36,13 @@
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
@ -37,8 +50,21 @@
import io.debezium.data.VariableScaleDecimal;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.KafkaConnectUtil;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.Table;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlParserListener;
import io.debezium.relational.ddl.DdlParserListener.TableCreatedEvent;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryMetrics;
import io.debezium.relational.history.TableChanges;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.Collect;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
@ -77,7 +103,6 @@ public void afterEach() throws Exception {
}
// todo:
// add test for having old non-attribute populated schema history and status=2 triggered
// add clob, blob, and xml support
@Test
@ -664,6 +689,225 @@ public void shouldStreamSchemaChangeWithDataChangeTemporalDataTypesAsConnect() t
-187_506_210_000L);
}
@Test
@FixFor("DBZ-3401")
public void shouldStreamOfflineSchemaChangeWithExistingLegacySchemaHistory() throws Exception {
TestHelper.dropTable(connection, "dbz3401");
try {
LogInterceptor logInterceptor = new LogInterceptor(BaseSourceTask.class);
final String columnName = "C1";
final String columnType = "varchar2(50)";
final QueryValue insertValue = QueryValue.ofBind("test");
final QueryValue updateValue = QueryValue.ofBind("updated");
final String expectedInsert = "test";
final String expectedUpdate = "updated";
// create table & stream it
createAndStreamTable(columnName, columnType);
// Create schema history
createSchemaHistoryForDdl(String.format(
"CREATE TABLE dbz3401 (id numeric(9,0) primary key, %s %s)", columnName, columnType));
// Create Offsets
createOffsetBasedOnCurrentScn();
Configuration config = configureAndStartConnector(false);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
assertThat(logInterceptor.containsMessage("No previous offsets found"))
.as("Existing offsets were not found but expected")
.isFalse();
assertNoRecordsToConsume();
stopConnector();
// Do offline actions
insertRowWithoutCommit(columnName, insertValue, 1);
connection.commit();
connection.execute("ALTER TABLE dbz3401 ADD " + columnName + "2 " + columnType);
insertRowOffline(columnName, insertValue, 2);
connection.execute("ALTER TABLE dbz3401 DROP COLUMN " + columnName + "2");
updateRowOffline(columnName, updateValue, 2);
connection.execute("ALTER TABLE dbz3401 ADD " + columnName + "2 " + columnType);
connection.execute("DELETE FROM dbz3401 WHERE ID = 2");
connection.execute("ALTER TABLE dbz3401 DROP COLUMN " + columnName + "2");
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
final int expected = 4;
SourceRecords records = consumeRecordsByTopic(expected);
List<SourceRecord> tableRecords = records.recordsForTopic(topicName("DEBEZIUM", "DBZ3401"));
// Insert (before schema change)
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get(columnName)).isEqualTo(expectedInsert);
// Insert
after = ((Struct) tableRecords.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get(columnName)).isEqualTo(expectedInsert);
assertThat(after.get(columnName + "2")).isEqualTo(expectedInsert);
// Update
after = ((Struct) tableRecords.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get(columnName)).isEqualTo(expectedUpdate);
assertThat(after.schema().field(columnName + "2")).isNull();
// Delete
Struct before = ((Struct) tableRecords.get(3).value()).getStruct(Envelope.FieldName.BEFORE);
after = ((Struct) tableRecords.get(3).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(before.get("ID")).isEqualTo(2);
assertThat(before.get(columnName)).isEqualTo(expectedUpdate);
assertThat(before.get(columnName + "2")).isNull();
assertThat(after).isNull();
}
finally {
// Shutdown the connector explicitly
stopConnector();
// drop the table in case of a failure
TestHelper.dropTable(connection, "dbz3401");
// cleanup state from multiple invocations
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
Testing.Files.delete(OFFSET_STORE_PATH);
}
}
@Test
@FixFor("DBZ-3401")
public void shouldStreamSchemaChangeWithExistingLegacySchemaHistory() throws Exception {
TestHelper.dropTable(connection, "dbz3401");
try {
LogInterceptor logInterceptor = new LogInterceptor(BaseSourceTask.class);
final String columnName = "C1";
final String columnType = "varchar2(50)";
final QueryValue insertValue = QueryValue.ofBind("test");
final QueryValue updateValue = QueryValue.ofBind("updated");
final String expectedInsert = "test";
final String expectedUpdate = "updated";
createAndStreamTable(columnName, columnType);
// Create schema history
createSchemaHistoryForDdl(String.format(
"CREATE TABLE dbz3401 (id numeric(9,0) primary key, %s %s)", columnName, columnType));
// Create Offsets
createOffsetBasedOnCurrentScn();
configureAndStartConnector(false);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
assertThat(logInterceptor.containsMessage("No previous offsets found"))
.as("Existing offsets were not found but expected")
.isFalse();
// insert streaming record
insertRowWithoutCommit(columnName, insertValue, 1);
// add a new column to trigger a schema change
connection.execute("ALTER TABLE dbz3401 add C2 varchar2(50)");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> tableRecords = records.recordsForTopic(topicName("DEBEZIUM", "DBZ3401"));
assertThat(tableRecords).hasSize(1);
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get(columnName.toUpperCase())).isEqualTo(expectedInsert);
assertThat(after.schema().field("C2")).isNull(); // field was added after insert
// update streaming record
updateRowWithoutCommit(columnName, updateValue, 1);
// add a new column to trigger a schema change
connection.execute("ALTER TABLE dbz3401 add C3 varchar2(50)");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic(topicName("DEBEZIUM", "DBZ3401"));
assertThat(tableRecords).hasSize(1);
after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get(columnName.toUpperCase())).isEqualTo(expectedUpdate);
assertThat(after.get("C2")).isNull();
assertThat(after.schema().field("C3")).isNull(); // field was added after update
// delete streaming record
connection.executeWithoutCommitting("DELETE FROM dbz3401 where id = 1");
connection.execute("ALTER TABLE dbz3401 add C4 varchar2(50)");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic(topicName("DEBEZIUM", "DBZ3401"));
assertThat(tableRecords).hasSize(1);
Struct before = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.BEFORE);
assertThat(before.get("ID")).isEqualTo(1);
assertThat(before.get(columnName.toUpperCase())).isEqualTo(expectedUpdate);
assertThat(before.get("C2")).isNull();
assertThat(before.get("C3")).isNull();
after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after).isNull();
// Perform DML and then DDL (drop table within same scope)
insertRowWithoutCommit(columnName, insertValue, 2);
// This test case does not use PURGE so that the table gets pushed into the Oracle RECYCLEBIN
// LogMiner materializes table name as "ORCLPDB1.DEBEZIUM.BIN$<base64>==$0"
connection.execute("DROP TABLE dbz3401");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic(topicName("DEBEZIUM", "DBZ3401"));
assertThat(tableRecords).hasSize(1);
after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after).isNotNull();
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get(columnName.toUpperCase())).isEqualTo(expectedInsert);
// Now lets test re-creating the table in-flight
// This should automatically capture the schema object details
createAndStreamTable(columnName, columnType);
// Perform DML and then DDL (drop table within same scope)
insertRowWithoutCommit(columnName, insertValue, 3);
// This test case uses PURGE.
// LogMiner materializes table name as "ORCLPDB1.UNKNOWN.OBJ# <num>"
connection.execute("DROP TABLE dbz3401 PURGE");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic(topicName("DEBEZIUM", "DBZ3401"));
assertThat(tableRecords).hasSize(1);
after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after).isNotNull();
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get(columnName.toUpperCase())).isEqualTo(expectedInsert);
stopConnector();
}
finally {
// Shutdown the connector explicitly
stopConnector();
// drop the table in case of a failure
TestHelper.dropTable(connection, "dbz3401");
// cleanup state from multiple invocations
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
Testing.Files.delete(OFFSET_STORE_PATH);
}
}
@SuppressWarnings("SameParameterValue")
private static String toTimestamp(int year, int month, int day, int hour, int min, int sec, int nanos, int precision) {
String nanoSeconds = Strings.justify(Strings.Justify.RIGHT, String.valueOf(nanos), precision, '0');
@ -878,6 +1122,91 @@ private void streamOfflineSchemaChanges(String columnType, QueryValue insertValu
}
}
private void createOffsetBasedOnCurrentScn() throws Exception {
final Scn currentScn;
try (OracleConnection admin = TestHelper.adminConnection()) {
currentScn = admin.getCurrentScn();
}
final Converter keyConverter = KafkaConnectUtil.converterForOffsetStore();
final Converter valueConverter = KafkaConnectUtil.converterForOffsetStore();
final Map<String, String> embeddedConfig = TestHelper.defaultConfig().build().asMap(EmbeddedEngineConfig.ALL_FIELDS);
embeddedConfig.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, keyConverter.getClass().getName());
embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, valueConverter.getClass().getName());
System.out.println(embeddedConfig);
final OffsetBackingStore store = KafkaConnectUtil.fileOffsetBackingStore();
store.configure(new TestWorkerConfig(embeddedConfig));
store.start();
try {
final Map<String, Object> partition = Map.of("server", TestHelper.SERVER_NAME);
final Map<String, Object> offsets = Map.of(
"snapshot", true,
"scn", currentScn.toString(),
"snapshot_completed", true);
final OffsetStorageWriter writer = new OffsetStorageWriter(store, "testing-connector", keyConverter, valueConverter);
writer.offset(partition, offsets);
writer.beginFlush();
Future<Void> flush = writer.doFlush((error, result) -> {
// do nothing
});
assertThat(flush).isNotNull();
// wait for flush
flush.get();
}
finally {
store.stop();
}
}
private void createSchemaHistoryForDdl(String ddlText) {
final SchemaHistory schemaHistory = new FileSchemaHistory();
schemaHistory.configure(Configuration.create()
.with(FileSchemaHistory.FILE_PATH, TestHelper.SCHEMA_HISTORY_PATH.toString())
.build(),
null,
SchemaHistoryMetrics.NOOP,
true);
schemaHistory.start();
final String databaseName = TestHelper.getDatabaseName().toUpperCase();
final String schemaName = TestHelper.SCHEMA_USER.toUpperCase();
final Map<String, Object> source = Collect.linkMapOf("server", TestHelper.SERVER_NAME);
final Map<String, Object> position = Collect.linkMapOf(
"commit_scn", "1001:1:",
"snapshot_scn", "1001",
"scn", "1001",
"snapshot_completed", true);
OracleDdlParser parser = new OracleDdlParser();
DdlChanges ddlChanges = parser.getDdlChanges();
Tables tables = new Tables();
ddlChanges.reset();
parser.setCurrentDatabase(databaseName);
parser.setCurrentSchema(schemaName);
parser.parse(ddlText, tables);
ddlChanges.getEventsByDatabase((String dbName, List<DdlParserListener.Event> events) -> {
events.forEach(event -> {
if (event instanceof TableCreatedEvent) {
final TableCreatedEvent createEvent = (TableCreatedEvent) event;
final Table table = tables.forTable(createEvent.tableId());
final TableChanges changes = new TableChanges().create(table);
schemaHistory.record(source, position, databaseName, schemaName, ddlText, changes, Instant.now());
}
});
});
}
private Struct varScaleDecimal(String value) {
return fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal(value));
}
@ -1042,4 +1371,22 @@ public Object getValue() {
return value;
}
}
// Taken from EmbeddedEngine
protected static class TestWorkerConfig extends WorkerConfig {
private static final ConfigDef CONFIG;
static {
ConfigDef config = baseConfigDef();
Field.group(config, "file", EmbeddedEngineConfig.OFFSET_STORAGE_FILE_FILENAME);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR);
CONFIG = config;
}
protected TestWorkerConfig(Map<String, String> props) {
super(CONFIG, props);
}
}
}