DBZ-1916 Add tests for datatype.propagate.source.type

This commit is contained in:
Chris Cranford 2020-06-15 16:29:17 -04:00 committed by Gunnar Morling
parent 27e0b22e48
commit 864fc7ee28
4 changed files with 171 additions and 1 deletions

View File

@ -5,7 +5,11 @@
*/
package io.debezium.connector.db2;
import static io.debezium.connector.db2.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.db2.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.db2.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import static org.junit.Assert.assertNull;
import java.sql.SQLException;
@ -14,6 +18,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@ -50,11 +55,13 @@ public void before() throws SQLException {
"CREATE TABLE tableb (id int not null, colb varchar(30), primary key (id))",
"CREATE TABLE masked_hashed_column_table (id int not null, name varchar(255), name2 varchar(255), name3 varchar(20), primary key (id))",
"CREATE TABLE truncated_column_table (id int not null, name varchar(20), primary key (id))",
"CREATE TABLE dt_table (id int not null, c1 int, c2 int, c3a numeric(5,2), c3b varchar(128), f1 float(10), f2 decimal(8,4), primary key(id))",
"INSERT INTO tablea VALUES(1, 'a')");
TestHelper.enableTableCdc(connection, "TABLEA");
TestHelper.enableTableCdc(connection, "TABLEB");
TestHelper.enableTableCdc(connection, "MASKED_HASHED_COLUMN_TABLE");
TestHelper.enableTableCdc(connection, "TRUNCATED_COLUMN_TABLE");
TestHelper.enableTableCdc(connection, "DT_TABLE");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Testing.Print.enable();
@ -64,11 +71,12 @@ public void before() throws SQLException {
public void after() throws SQLException {
if (connection != null) {
TestHelper.disableDbCdc(connection);
TestHelper.disableTableCdc(connection, "DT_TABLE");
TestHelper.disableTableCdc(connection, "TRUNCATED_COLUMN_TABLE");
TestHelper.disableTableCdc(connection, "MASKED_HASHED_COLUMN_TABLE");
TestHelper.disableTableCdc(connection, "TABLEB");
TestHelper.disableTableCdc(connection, "TABLEA");
connection.execute("DROP TABLE tablea", "DROP TABLE tableb", "DROP TABLE masked_hashed_column_table", "DROP TABLE truncated_column_table");
connection.execute("DROP TABLE tablea", "DROP TABLE tableb", "DROP TABLE masked_hashed_column_table", "DROP TABLE truncated_column_table", "DROP TABLE dt_table");
connection.execute("DELETE FROM ASNCDC.IBMSNAP_REGISTER");
connection.execute("DELETE FROM ASNCDC.IBMQREP_COLVERSION");
connection.execute("DELETE FROM ASNCDC.IBMQREP_TABVERSION");
@ -787,6 +795,59 @@ public void shouldRewriteIdentityKey() throws Exception {
stopConnector();
}
@Test
@FixFor({"DBZ-1916", "DBZ-1830"})
public void shouldPropagateSourceTypeByDatatype() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(Db2ConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.DECIMAL,.+\\.REAL")
.build();
start(Db2Connector.class, config);
// Wait for snapshot completion
consumeRecordsByTopic(1);
// Wait for snapshot completion
consumeRecordsByTopic(1);
TestHelper.enableDbCdc(connection);
connection.execute("UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'");
TestHelper.refreshAndWait(connection);
connection.setAutoCommit(false);
connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)");
final SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("testdb.DB2INST1.DT_TABLE");
assertThat(recordsForTopic).hasSize(1);
final Field before = recordsForTopic.get(0).valueSchema().field("before");
assertThat(before.schema().field("ID").schema().parameters()).isNull();
assertThat(before.schema().field("C1").schema().parameters()).isNull();
assertThat(before.schema().field("C2").schema().parameters()).isNull();
assertThat(before.schema().field("C3A").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "DECIMAL"),
entry(TYPE_LENGTH_PARAMETER_KEY, "5"),
entry(TYPE_SCALE_PARAMETER_KEY, "2"));
assertThat(before.schema().field("C3B").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR"),
entry(TYPE_LENGTH_PARAMETER_KEY, "128"));
assertThat(before.schema().field("F2").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "DECIMAL"),
entry(TYPE_LENGTH_PARAMETER_KEY, "8"),
entry(TYPE_SCALE_PARAMETER_KEY, "4"));
assertThat(before.schema().field("F1").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "REAL"),
entry(TYPE_LENGTH_PARAMETER_KEY, "24"));
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}

View File

@ -44,6 +44,21 @@ public class TestHelper {
public static final String TEST_DATABASE = "testdb";
public static final int WAIT_FOR_CDC = 3 * 1000;
/**
* Key for schema parameter used to store a source column's type name.
*/
public static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
/**
* Key for schema parameter used to store a source column's type length.
*/
public static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
/**
* Key for schema parameter used to store a source column's type scale.
*/
public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale";
private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String ENABLE_DB_CDC = "VALUES ASNCDC.ASNCDCSERVICES('start','asncdc')";

View File

@ -5,8 +5,12 @@
*/
package io.debezium.connector.oracle;
import static io.debezium.connector.oracle.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.oracle.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.oracle.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import java.math.BigDecimal;
import java.sql.SQLException;
@ -16,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
@ -52,6 +57,7 @@ public static void beforeClass() throws SQLException {
TestHelper.dropTable(connection, "debezium.customer");
TestHelper.dropTable(connection, "debezium.masked_hashed_column_table");
TestHelper.dropTable(connection, "debezium.truncated_column_table");
TestHelper.dropTable(connection, "debezium.dt_table");
String ddl = "create table debezium.customer (" +
" id numeric(9,0) not null, " +
@ -86,6 +92,21 @@ public static void beforeClass() throws SQLException {
connection.execute(ddl3);
connection.execute("GRANT SELECT ON debezium.truncated_column_table to " + TestHelper.CONNECTOR_USER);
connection.execute("ALTER TABLE debezium.truncated_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
String ddl4 = "create table dt_table (" +
" id numeric(9,0) not null, " +
" c1 int, " +
" c2 int, " +
" c3a numeric(5,2), " +
" c3b varchar(128), " +
" f1 float(10), " +
" f2 decimal(8,4), " +
" primary key (id)" +
")";
connection.execute(ddl4);
connection.execute("GRANT SELECT ON debezium.dt_table to " + TestHelper.CONNECTOR_USER);
connection.execute("ALTER TABLE debezium.dt_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
}
@AfterClass
@ -100,6 +121,7 @@ public void before() throws SQLException {
connection.execute("delete from debezium.customer");
connection.execute("delete from debezium.masked_hashed_column_table");
connection.execute("delete from debezium.truncated_column_table");
connection.execute("delete from debezium.dt_table");
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
@ -713,6 +735,63 @@ private void shouldRewriteIdentityKey(boolean useDatabaseName) throws Exception
stopConnector();
}
@Test
@FixFor({"DBZ-1916", "DBZ-1830"})
public void shouldPropagateSourceTypeByDatatype() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with("datatype.propagate.source.type", ".+\\.NUMBER,.+\\.VARCHAR2,.+\\.FLOAT")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)");
connection.execute("COMMIT");
final SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.DT_TABLE");
assertThat(recordsForTopic).hasSize(1);
final Field before = recordsForTopic.get(0).valueSchema().field("before");
assertThat(before.schema().field("ID").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"),
entry(TYPE_LENGTH_PARAMETER_KEY, "9"),
entry(TYPE_SCALE_PARAMETER_KEY, "0"));
assertThat(before.schema().field("C1").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"),
entry(TYPE_LENGTH_PARAMETER_KEY, "38"),
entry(TYPE_SCALE_PARAMETER_KEY, "0"));
assertThat(before.schema().field("C2").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"),
entry(TYPE_LENGTH_PARAMETER_KEY, "38"),
entry(TYPE_SCALE_PARAMETER_KEY, "0"));
assertThat(before.schema().field("C3A").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"),
entry(TYPE_LENGTH_PARAMETER_KEY, "5"),
entry(TYPE_SCALE_PARAMETER_KEY, "2"));
assertThat(before.schema().field("C3B").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR2"),
entry(TYPE_LENGTH_PARAMETER_KEY, "128"));
assertThat(before.schema().field("F2").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMBER"),
entry(TYPE_LENGTH_PARAMETER_KEY, "8"),
entry(TYPE_SCALE_PARAMETER_KEY, "4"));
assertThat(before.schema().field("F1").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"),
entry(TYPE_LENGTH_PARAMETER_KEY, "10"));
}
private void verifyHeartbeatRecord(SourceRecord heartbeat) {
assertEquals("__debezium-heartbeat.server1", heartbeat.topic());

View File

@ -32,6 +32,21 @@ public class TestHelper {
public static final String SERVER_NAME = "server1";
/**
* Key for schema parameter used to store a source column's type name.
*/
public static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
/**
* Key for schema parameter used to store a source column's type length.
*/
public static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
/**
* Key for schema parameter used to store a source column's type scale.
*/
public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale";
private static JdbcConfiguration defaultJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")