From 864fc7ee2833b47383eb9123c3b4433318d45321 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Mon, 15 Jun 2020 16:29:17 -0400 Subject: [PATCH] DBZ-1916 Add tests for datatype.propagate.source.type --- .../connector/db2/Db2ConnectorIT.java | 63 ++++++++++++++- .../connector/db2/util/TestHelper.java | 15 ++++ .../connector/oracle/OracleConnectorIT.java | 79 +++++++++++++++++++ .../connector/oracle/util/TestHelper.java | 15 ++++ 4 files changed, 171 insertions(+), 1 deletion(-) diff --git a/debezium-connector-db2/src/test/java/io/debezium/connector/db2/Db2ConnectorIT.java b/debezium-connector-db2/src/test/java/io/debezium/connector/db2/Db2ConnectorIT.java index e18cc4946..b9d2bbaab 100644 --- a/debezium-connector-db2/src/test/java/io/debezium/connector/db2/Db2ConnectorIT.java +++ b/debezium-connector-db2/src/test/java/io/debezium/connector/db2/Db2ConnectorIT.java @@ -5,7 +5,11 @@ */ package io.debezium.connector.db2; +import static io.debezium.connector.db2.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY; +import static io.debezium.connector.db2.util.TestHelper.TYPE_NAME_PARAMETER_KEY; +import static io.debezium.connector.db2.util.TestHelper.TYPE_SCALE_PARAMETER_KEY; import static org.fest.assertions.Assertions.assertThat; +import static org.fest.assertions.MapAssert.entry; import static org.junit.Assert.assertNull; import java.sql.SQLException; @@ -14,6 +18,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -50,11 +55,13 @@ public void before() throws SQLException { "CREATE TABLE tableb (id int not null, colb varchar(30), primary key (id))", "CREATE TABLE masked_hashed_column_table (id int not null, name varchar(255), name2 varchar(255), name3 varchar(20), primary key (id))", "CREATE TABLE truncated_column_table (id int not null, name varchar(20), primary key (id))", + "CREATE TABLE dt_table (id int not null, c1 int, c2 int, c3a numeric(5,2), c3b varchar(128), f1 float(10), f2 decimal(8,4), primary key(id))", "INSERT INTO tablea VALUES(1, 'a')"); TestHelper.enableTableCdc(connection, "TABLEA"); TestHelper.enableTableCdc(connection, "TABLEB"); TestHelper.enableTableCdc(connection, "MASKED_HASHED_COLUMN_TABLE"); TestHelper.enableTableCdc(connection, "TRUNCATED_COLUMN_TABLE"); + TestHelper.enableTableCdc(connection, "DT_TABLE"); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); Testing.Print.enable(); @@ -64,11 +71,12 @@ public void before() throws SQLException { public void after() throws SQLException { if (connection != null) { TestHelper.disableDbCdc(connection); + TestHelper.disableTableCdc(connection, "DT_TABLE"); TestHelper.disableTableCdc(connection, "TRUNCATED_COLUMN_TABLE"); TestHelper.disableTableCdc(connection, "MASKED_HASHED_COLUMN_TABLE"); TestHelper.disableTableCdc(connection, "TABLEB"); TestHelper.disableTableCdc(connection, "TABLEA"); - connection.execute("DROP TABLE tablea", "DROP TABLE tableb", "DROP TABLE masked_hashed_column_table", "DROP TABLE truncated_column_table"); + connection.execute("DROP TABLE tablea", "DROP TABLE tableb", "DROP TABLE masked_hashed_column_table", "DROP TABLE truncated_column_table", "DROP TABLE dt_table"); connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER"); connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION"); connection.execute("DELETE FROM ASNCDC.IBMQREP_TABVERSION"); @@ -787,6 +795,59 @@ public void shouldRewriteIdentityKey() throws Exception { stopConnector(); } + @Test + @FixFor({"DBZ-1916", "DBZ-1830"}) + public void shouldPropagateSourceTypeByDatatype() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.DECIMAL,.+\\.REAL") + .build(); + + start(Db2Connector.class, config); + + // Wait for snapshot completion + consumeRecordsByTopic(1); + + // Wait for snapshot completion + consumeRecordsByTopic(1); + + TestHelper.enableDbCdc(connection); + connection.execute("UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"); + TestHelper.refreshAndWait(connection); + + connection.setAutoCommit(false); + connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"); + + final SourceRecords records = consumeRecordsByTopic(1); + + List recordsForTopic = records.recordsForTopic("testdb.DB2INST1.DT_TABLE"); + assertThat(recordsForTopic).hasSize(1); + + final Field before = recordsForTopic.get(0).valueSchema().field("before"); + + assertThat(before.schema().field("ID").schema().parameters()).isNull(); + assertThat(before.schema().field("C1").schema().parameters()).isNull(); + assertThat(before.schema().field("C2").schema().parameters()).isNull(); + + assertThat(before.schema().field("C3A").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "DECIMAL"), + entry(TYPE_LENGTH_PARAMETER_KEY, "5"), + entry(TYPE_SCALE_PARAMETER_KEY, "2")); + + assertThat(before.schema().field("C3B").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR"), + entry(TYPE_LENGTH_PARAMETER_KEY, "128")); + + assertThat(before.schema().field("F2").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "DECIMAL"), + entry(TYPE_LENGTH_PARAMETER_KEY, "8"), + entry(TYPE_SCALE_PARAMETER_KEY, "4")); + + assertThat(before.schema().field("F1").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "REAL"), + entry(TYPE_LENGTH_PARAMETER_KEY, "24")); + } + private void assertRecord(Struct record, List expected) { expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); } diff --git a/debezium-connector-db2/src/test/java/io/debezium/connector/db2/util/TestHelper.java b/debezium-connector-db2/src/test/java/io/debezium/connector/db2/util/TestHelper.java index b0f55fdcd..93d2f791d 100644 --- a/debezium-connector-db2/src/test/java/io/debezium/connector/db2/util/TestHelper.java +++ b/debezium-connector-db2/src/test/java/io/debezium/connector/db2/util/TestHelper.java @@ -44,6 +44,21 @@ public class TestHelper { public static final String TEST_DATABASE = "testdb"; public static final int WAIT_FOR_CDC = 3 * 1000; + /** + * Key for schema parameter used to store a source column's type name. + */ + public static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type"; + + /** + * Key for schema parameter used to store a source column's type length. + */ + public static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length"; + + /** + * Key for schema parameter used to store a source column's type scale. + */ + public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale"; + private static final String STATEMENTS_PLACEHOLDER = "#"; private static final String ENABLE_DB_CDC = "VALUES ASNCDC.ASNCDCSERVICES('start','asncdc')"; diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 63dbbb702..0f5f2fdc3 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -5,8 +5,12 @@ */ package io.debezium.connector.oracle; +import static io.debezium.connector.oracle.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY; +import static io.debezium.connector.oracle.util.TestHelper.TYPE_NAME_PARAMETER_KEY; +import static io.debezium.connector.oracle.util.TestHelper.TYPE_SCALE_PARAMETER_KEY; import static junit.framework.TestCase.assertEquals; import static org.fest.assertions.Assertions.assertThat; +import static org.fest.assertions.MapAssert.entry; import java.math.BigDecimal; import java.sql.SQLException; @@ -16,6 +20,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.AfterClass; @@ -52,6 +57,7 @@ public static void beforeClass() throws SQLException { TestHelper.dropTable(connection, "debezium.customer"); TestHelper.dropTable(connection, "debezium.masked_hashed_column_table"); TestHelper.dropTable(connection, "debezium.truncated_column_table"); + TestHelper.dropTable(connection, "debezium.dt_table"); String ddl = "create table debezium.customer (" + " id numeric(9,0) not null, " + @@ -86,6 +92,21 @@ public static void beforeClass() throws SQLException { connection.execute(ddl3); connection.execute("GRANT SELECT ON debezium.truncated_column_table to " + TestHelper.CONNECTOR_USER); connection.execute("ALTER TABLE debezium.truncated_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + + String ddl4 = "create table dt_table (" + + " id numeric(9,0) not null, " + + " c1 int, " + + " c2 int, " + + " c3a numeric(5,2), " + + " c3b varchar(128), " + + " f1 float(10), " + + " f2 decimal(8,4), " + + " primary key (id)" + + ")"; + + connection.execute(ddl4); + connection.execute("GRANT SELECT ON debezium.dt_table to " + TestHelper.CONNECTOR_USER); + connection.execute("ALTER TABLE debezium.dt_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); } @AfterClass @@ -100,6 +121,7 @@ public void before() throws SQLException { connection.execute("delete from debezium.customer"); connection.execute("delete from debezium.masked_hashed_column_table"); connection.execute("delete from debezium.truncated_column_table"); + connection.execute("delete from debezium.dt_table"); setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); @@ -713,6 +735,63 @@ private void shouldRewriteIdentityKey(boolean useDatabaseName) throws Exception stopConnector(); } + @Test + @FixFor({"DBZ-1916", "DBZ-1830"}) + public void shouldPropagateSourceTypeByDatatype() throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with("datatype.propagate.source.type", ".+\\.NUMBER,.+\\.VARCHAR2,.+\\.FLOAT") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"); + connection.execute("COMMIT"); + + final SourceRecords records = consumeRecordsByTopic(1); + + List recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.DT_TABLE"); + assertThat(recordsForTopic).hasSize(1); + + final Field before = recordsForTopic.get(0).valueSchema().field("before"); + + assertThat(before.schema().field("ID").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"), + entry(TYPE_LENGTH_PARAMETER_KEY, "9"), + entry(TYPE_SCALE_PARAMETER_KEY, "0")); + + assertThat(before.schema().field("C1").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"), + entry(TYPE_LENGTH_PARAMETER_KEY, "38"), + entry(TYPE_SCALE_PARAMETER_KEY, "0")); + + assertThat(before.schema().field("C2").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"), + entry(TYPE_LENGTH_PARAMETER_KEY, "38"), + entry(TYPE_SCALE_PARAMETER_KEY, "0")); + + assertThat(before.schema().field("C3A").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"), + entry(TYPE_LENGTH_PARAMETER_KEY, "5"), + entry(TYPE_SCALE_PARAMETER_KEY, "2")); + + assertThat(before.schema().field("C3B").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR2"), + entry(TYPE_LENGTH_PARAMETER_KEY, "128")); + + assertThat(before.schema().field("F2").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"), + entry(TYPE_LENGTH_PARAMETER_KEY, "8"), + entry(TYPE_SCALE_PARAMETER_KEY, "4")); + + assertThat(before.schema().field("F1").schema().parameters()).includes( + entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"), + entry(TYPE_LENGTH_PARAMETER_KEY, "10")); + } + private void verifyHeartbeatRecord(SourceRecord heartbeat) { assertEquals("__debezium-heartbeat.server1", heartbeat.topic()); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index 6d4862a20..58ccb96a7 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -32,6 +32,21 @@ public class TestHelper { public static final String SERVER_NAME = "server1"; + /** + * Key for schema parameter used to store a source column's type name. + */ + public static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type"; + + /** + * Key for schema parameter used to store a source column's type length. + */ + public static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length"; + + /** + * Key for schema parameter used to store a source column's type scale. + */ + public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale"; + private static JdbcConfiguration defaultJdbcConfig() { return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) .withDefault(JdbcConfiguration.HOSTNAME, "localhost")