DBZ-8089 Reset column scale on MODIFY ddl statements

This commit is contained in:
Chris Cranford 2024-07-23 13:54:26 -04:00 committed by Chris Cranford
parent 2851beadf5
commit 2603a96a48
2 changed files with 78 additions and 26 deletions

View File

@ -68,6 +68,11 @@ public void enterPrimary_key_clause(PlSqlParser.Primary_key_clauseContext ctx) {
@Override
public void enterModify_col_properties(PlSqlParser.Modify_col_propertiesContext ctx) {
// Scale should always get unset
// It should be parsed by the data type resolver, if its applicable
// This standardizes the handling of scale when data types shift from one to another
columnEditor.unsetScale();
resolveColumnDataType(ctx);
if (ctx.DEFAULT() != null) {
columnEditor.defaultValueExpression(ctx.column_default_value().getText());

View File

@ -29,12 +29,13 @@
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.olr.OpenLogReplicatorStreamingChangeEventSource;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.TableId;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
@ -214,7 +215,7 @@ public void shouldStreamAlterTableAddColumnSchemaChange() throws Exception {
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -242,7 +243,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(3);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo("Test2");
@ -285,7 +286,7 @@ public void shouldStreamAlterTableAddMultipleColumnsSchemaChange() throws Except
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -313,7 +314,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(4);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo("Test2");
@ -357,7 +358,7 @@ public void shouldStreamAlterTableRenameColumnSchemaChange() throws Exception {
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -385,7 +386,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.schema().field("DATA")).isNull();
assertThat(after.get("ID")).isEqualTo(2);
@ -428,7 +429,7 @@ public void shouldStreamAlterTableDropColumnSchemaChange() throws Exception {
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -456,7 +457,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(1);
assertThat(after.schema().field("DATA")).isNull();
assertThat(after.get("ID")).isEqualTo(2);
@ -498,7 +499,7 @@ public void shouldStreamAlterTableDropMultipleColumnsSchemaChange() throws Excep
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(3);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA1")).isEqualTo("Test");
@ -527,7 +528,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(1);
assertThat(after.schema().field("DATA1")).isNull();
assertThat(after.schema().field("DATA2")).isNull();
@ -570,7 +571,7 @@ public void shouldStreamAlterTableRenameTableSchemaChange() throws Exception {
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -597,7 +598,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEB")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEB");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo("Test2");
@ -640,7 +641,7 @@ public void shouldNotStreamAfterTableRenameToExcludedName() throws Exception {
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -713,7 +714,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -755,7 +756,7 @@ public void shouldStreamAlterTableChangeColumnNullability() throws Exception {
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -784,7 +785,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isNull();
@ -829,7 +830,7 @@ public void shouldStreamAlterTableChangeColumnPrecisionAndScale() throws Excepti
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo(BigDecimal.valueOf(12345.67));
@ -860,7 +861,7 @@ record = records.recordsForTopic(TestHelper.SERVER_NAME).get(0);
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 2);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.schema().fields()).hasSize(2);
assertThat(after.get("ID")).isEqualTo(2);
assertThat(after.get("DATA")).isEqualTo(BigDecimal.valueOf(234567.891));
@ -907,7 +908,7 @@ public void shouldStreamDropTable() throws Exception {
record = records.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
VerifyRecord.isValidInsert(record, "ID", 1);
assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -1159,7 +1160,7 @@ public void shouldParseSchemaChangeWithoutErrorOnFilteredTableWithRawDataType()
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "DBZ4037A"))).hasSize(1);
SourceRecord record = records.recordsForTopic(topicName("DEBEZIUM", "DBZ4037A")).get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test");
@ -1205,7 +1206,7 @@ public void shouldParseSchemaChangeOnTableWithRawDataType() throws Exception {
assertThat(records.recordsForTopic(topicName("DEBEZIUM", "DBZ4037"))).hasSize(1);
SourceRecord record = records.recordsForTopic(topicName("DEBEZIUM", "DBZ4037")).get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) record.value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo(ByteBuffer.wrap("Test".getBytes()));
assertThat(after.get("DATA2")).isEqualTo(ByteBuffer.wrap("T".getBytes()));
@ -1448,7 +1449,7 @@ public void shouldOnlyCaptureSchemaChangesForIncludedTables() throws Exception {
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidRead(tableRecords.get(0), "ID", 1);
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("A");
@ -1478,7 +1479,7 @@ public void shouldOnlyCaptureSchemaChangesForIncludedTables() throws Exception {
tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidInsert(tableRecords.get(0), "ID", 3);
after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) tableRecords.get(0).value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("DATA")).isEqualTo("A3");
assertThat(after.get("DATA2")).isEqualTo("D1");
@ -1537,7 +1538,7 @@ public void shouldCaptureSchemaChangesForAllTablesRegardlessOfIncludeList() thro
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidRead(tableRecords.get(0), "ID", 1);
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
Struct after = ((Struct) tableRecords.get(0).value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("A");
@ -1573,7 +1574,7 @@ public void shouldCaptureSchemaChangesForAllTablesRegardlessOfIncludeList() thro
tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
assertThat(tableRecords).hasSize(1);
VerifyRecord.isValidInsert(tableRecords.get(0), "ID", 3);
after = ((Struct) tableRecords.get(0).value()).getStruct(Envelope.FieldName.AFTER);
after = ((Struct) tableRecords.get(0).value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(3);
assertThat(after.get("DATA")).isEqualTo("A3");
assertThat(after.get("DATA2")).isEqualTo("D1");
@ -1587,6 +1588,52 @@ public void shouldCaptureSchemaChangesForAllTablesRegardlessOfIncludeList() thro
}
}
@Test
@FixFor("DBZ-8089")
public void shouldResetScaleWhenColumnDataTypeIsModified() throws Exception {
TestHelper.dropTable(connection, "dbz8089");
try {
// When NUMBER column is set, it's constructed with a scale of 0.
// Later when the column is modified to be a FLOAT, the scale should be reset to null rather than left as 0.
connection.execute("CREATE TABLE dbz8089 (id numeric(9,0) primary key, salary numeric(*,0))");
TestHelper.streamTable(connection, "dbz8089");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8089")
.with(OracleConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO dbz8089 values (1, 12.36)");
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.allRecordsInOrder()).hasSize(1);
Struct after = ((Struct) records.allRecordsInOrder().get(0).value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1.0);
assertThat(after.get("SALARY")).isEqualTo(12.0); // 12.36 will be truncated
// Previous the scale was not being reset from the original NUMERIC data type
// This meant the scale was left as 0 rather than null, so the value converters would attempt unnecessary adjustments.
// By adjusting the DDL parser to reset the scale to null, this mimics how the table's schema is parsed before snapshots.
connection.execute("ALTER TABLE dbz8089 modify salary float");
connection.execute("INSERT INTO dbz8089 values (2, 12.36)");
records = consumeRecordsByTopic(1);
assertThat(records.allRecordsInOrder()).hasSize(1);
after = ((Struct) records.allRecordsInOrder().get(0).value()).getStruct(FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(2.0);
assertThat(after.get("SALARY")).isEqualTo(12.36); // should be a double
}
finally {
stopConnector();
TestHelper.dropTable(connection, "dbz8089");
}
}
private static String getTableIdString(String schemaName, String tableName) {
return new TableId(TestHelper.getDatabaseName(), schemaName, tableName).toDoubleQuotedString();
}