DBZ-1916 Add tests for datatype.propagate.source.type

This commit is contained in:
Chris Cranford 2020-06-10 13:12:46 -04:00 committed by Gunnar Morling
parent b2dad7df3a
commit 64bff85ad2
3 changed files with 118 additions and 0 deletions

View File

@ -7,12 +7,16 @@
package io.debezium.connector.postgresql;
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.postgresql.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.postgresql.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT;
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
@ -38,6 +42,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -46,6 +51,7 @@
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -2485,6 +2491,50 @@ public void testHeartbeatActionQueryExecuted() throws Exception {
assertTrue(numOfHeartbeatActions > 0);
}
@Test
@FixFor({"DBZ-1916", "DBZ-1830"})
public void shouldPropagateSourceTypeByDatatype() throws Exception {
TestHelper.execute("DROP TABLE IF EXISTS test_table;");
TestHelper.execute("CREATE TABLE test_table (id SERIAL, c1 INT, c2 INT, c3a NUMERIC(5,2), c3b VARCHAR(128), f1 float(10), f2 decimal(8,4), primary key (id));");
startConnector(config -> config
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.FLOAT4"), false);
waitForStreamingToStart();
consumer = testConsumer(1);
executeAndWait("INSERT INTO test_table (id,c1,c2,c3a,c3b,f1,f2) values (1, 123, 456, 789.01, 'test', 1.228, 234.56);");
final SourceRecord record = assertRecordInserted("public.test_table", "id", 1);
final Field before = record.valueSchema().field("before");
// no type info requested as per given data types
assertThat(before.schema().field("id").schema().parameters()).isNull();
assertThat(before.schema().field("c1").schema().parameters()).isNull();
assertThat(before.schema().field("c2").schema().parameters()).isNull();
assertThat(before.schema().field("c3a").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMERIC"),
entry(TYPE_LENGTH_PARAMETER_KEY, "5"),
entry(TYPE_SCALE_PARAMETER_KEY, "2"));
// variable width, name and length info
assertThat(before.schema().field("c3b").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR"),
entry(TYPE_LENGTH_PARAMETER_KEY, "128"));
assertThat(before.schema().field("f2").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMERIC"),
entry(TYPE_LENGTH_PARAMETER_KEY, "8"),
entry(TYPE_SCALE_PARAMETER_KEY, "4"));
assertThat(before.schema().field("f1").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "FLOAT4"),
entry(TYPE_LENGTH_PARAMETER_KEY, "8"),
entry(TYPE_SCALE_PARAMETER_KEY, "8"));
}
private void assertHeartBeatRecordInserted() {
assertFalse("records not generated", consumer.isEmpty());

View File

@ -5,7 +5,11 @@
*/
package io.debezium.connector.sqlserver;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import static org.junit.Assert.assertNull;
import java.io.IOException;
@ -21,6 +25,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -1592,6 +1597,54 @@ public void useShortTableNamesForKeyMapper() throws Exception {
stopConnector();
}
@Test
@FixFor({"DBZ-1916", "DBZ-1830"})
public void shouldPropagateSourceTypeByDatatype() throws Exception {
connection.execute("CREATE TABLE dt_table (id int, c1 int, c2 int, c3a numeric(5,2), c3b varchar(128), f1 float(10), f2 decimal(8,4) primary key(id))");
TestHelper.enableTableCdc(connection, "dt_table");
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.dt_table")
.with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.REAL,.+\\.DECIMAL")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted("sql_server", "server1");
connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1, 123, 456, 789.01, 'test', 1.228, 234.56)");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.dbo.dt_table");
final SourceRecord record = recordsForTopic.get(0);
final Field before = record.valueSchema().field("before");
assertThat(before.schema().field("id").schema().parameters()).isNull();
assertThat(before.schema().field("c1").schema().parameters()).isNull();
assertThat(before.schema().field("c2").schema().parameters()).isNull();
assertThat(before.schema().field("c3a").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMERIC"),
entry(TYPE_LENGTH_PARAMETER_KEY, "5"),
entry(TYPE_SCALE_PARAMETER_KEY, "2"));
assertThat(before.schema().field("c3b").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR"),
entry(TYPE_LENGTH_PARAMETER_KEY, "128"));
assertThat(before.schema().field("f2").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "DECIMAL"),
entry(TYPE_LENGTH_PARAMETER_KEY, "8"),
entry(TYPE_SCALE_PARAMETER_KEY, "4"));
assertThat(before.schema().field("f1").schema().parameters()).includes(
entry(TYPE_NAME_PARAMETER_KEY, "REAL"),
entry(TYPE_LENGTH_PARAMETER_KEY, "24"));
stopConnector();
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}

View File

@ -60,6 +60,21 @@ public class TestHelper {
private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
private static final String CDC_WRAPPERS_DML;
/**
* Key for schema parameter used to store a source column's type name.
*/
public static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
/**
* Key for schema parameter used to store a source column's type length.
*/
public static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
/**
* Key for schema parameter used to store a source column's type scale.
*/
public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale";
static {
try {
ClassLoader classLoader = TestHelper.class.getClassLoader();