DBZ-1413 Updated tests

This commit is contained in:
Chris Cranford 2019-11-25 09:21:09 -05:00 committed by Gunnar Morling
parent ef9b4a7431
commit 46de58ae75
2 changed files with 91 additions and 10 deletions

View File

@ -15,6 +15,7 @@
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -30,6 +31,7 @@
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions; import org.fest.assertions.Assertions;
import org.junit.Before; import org.junit.Before;
import org.junit.ComparisonFailure;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
@ -38,6 +40,7 @@
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDatabaseVersionRule; import io.debezium.connector.postgresql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan; import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan;
import io.debezium.data.Bits;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord; import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
@ -485,7 +488,7 @@ public void shouldNotSnapshotMaterializedViews() throws Exception {
@Test @Test
@FixFor("DBZ-1413") @FixFor("DBZ-1413")
public void shouldGenerateSnapshotForDataTypeAlias() throws Exception { public void shouldSnapshotDomainTypeWithPropagatedSourceTypeAttributes() throws Exception {
TestHelper.dropAllSchemas(); TestHelper.dropAllSchemas();
TestHelper.execute("CREATE DOMAIN float83 AS numeric(8,3) DEFAULT 0.0;"); TestHelper.execute("CREATE DOMAIN float83 AS numeric(8,3) DEFAULT 0.0;");
TestHelper.execute("CREATE DOMAIN money2 AS MONEY DEFAULT 0.0;"); TestHelper.execute("CREATE DOMAIN money2 AS MONEY DEFAULT 0.0;");
@ -495,7 +498,7 @@ public void shouldGenerateSnapshotForDataTypeAlias() throws Exception {
buildNoStreamProducer(TestHelper.defaultConfig() buildNoStreamProducer(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE) .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with("column.propagate.source.type", "public.alias_table.area")); .with("column.propagate.source.type", "public.alias_table.area,public.alias_table.a"));
final TestConsumer consumer = testConsumer(1, "public"); final TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
@ -503,19 +506,69 @@ public void shouldGenerateSnapshotForDataTypeAlias() throws Exception {
// Specifying alias money2 results in JDBC type '2001' for 'salary2' // Specifying alias money2 results in JDBC type '2001' for 'salary2'
// Specifying money results in JDBC type '8' for 'salary' // Specifying money results in JDBC type '8' for 'salary'
consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", schemaAndValueForMoneyAliasType()))); List<SchemaAndValueField> expected = Arrays.asList(
}
private List<SchemaAndValueField> schemaAndValueForMoneyAliasType() {
return Arrays.asList(
new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), BigDecimal.valueOf(7.25)), new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), BigDecimal.valueOf(7.25)),
new SchemaAndValueField("salary2", Decimal.builder(2).optional().build(), BigDecimal.valueOf(8.25)), new SchemaAndValueField("salary2", Decimal.builder(2).optional().build(), BigDecimal.valueOf(8.25)),
new SchemaAndValueField("a", SchemaBuilder.float64().optional().build(), 12345.123), new SchemaAndValueField("a", SchemaBuilder.float64().optional()
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMERIC")
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8")
.parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "3")
.build(), 12345.123),
new SchemaAndValueField("area", SchemaBuilder.float64().optional() new SchemaAndValueField("area", SchemaBuilder.float64().optional()
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "FLOAT83") .parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "FLOAT83")
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "2147483647") .parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8")
.parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0") .parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "3")
.build(), 12345.123)); .build(), 12345.123));
consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected)));
}
@Test
@FixFor("DBZ-1413")
public void shouldSnapshotDomainAliasWithProperModifiers() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);");
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value varbit2 NOT NULL, PRIMARY KEY(pk));");
TestHelper.execute("INSERT INTO alias_table (value) values (B'101');");
buildNoStreamProducer(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
final TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
List<SchemaAndValueField> expected = Collections.singletonList(
new SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{ 5, 0 }));
consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected)));
}
@Test(expected = ComparisonFailure.class)
@FixFor("DBZ-1413")
public void shouldNotSnapshotNestedDomainAliasTypeModifiersNotPropagated() throws Exception {
// The pgjdbc driver does not currently provide support for type modifier resolution
// when a domain type extends an existing domain type that extends a base type using
// explicit type modifiers.
TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);");
TestHelper.execute("CREATE DOMAIN varbit2b AS varbit2;");
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value varbit2b NOT NULL, PRIMARY KEY (pk));");
TestHelper.execute("INSERT INTO alias_table (value) values (B'101');");
buildNoStreamProducer(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
final TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
// We would normally expect that the value column was created with a length of 3.
// However due to how the resolution works in the driver, it returns 2147483647.
// We probably want to avoid supporting this behavior for now?
List<SchemaAndValueField> expected = Collections.singletonList(
new SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{ 5, 0 }));
consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", expected)));
} }
private void buildNoStreamProducer(Configuration.Builder config) { private void buildNoStreamProducer(Configuration.Builder config) {

View File

@ -1579,6 +1579,34 @@ public void shouldStreamDomainAliasWithProperModifiers() throws Exception {
assertThat(consumer.isEmpty()).isTrue(); assertThat(consumer.isEmpty()).isTrue();
} }
@Test
@FixFor("DBZ-1413")
public void shouldStreamValuesForDomainTypeOfDomainType() throws Exception {
TestHelper.execute("CREATE DOMAIN numeric82 as numeric(8,2);");
TestHelper.execute("CREATE DOMAIN numericex as numeric82;");
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value numericex, PRIMARY KEY (pk));");
startConnector(config -> config
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"), false);
waitForStreamingToStart();
consumer = testConsumer(1);
executeAndWait("INSERT INTO alias_table (value) values (123.45);");
SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1);
assertSourceInfo(rec, "postgres", "public", "alias_table");
List<SchemaAndValueField> expected = Arrays.asList(
new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("value", SpecialValueDecimal.builder(DecimalMode.DOUBLE, 8, 2).optional().build(), 123.45));
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
assertThat(consumer.isEmpty()).isTrue();
}
@Test @Test
@FixFor("DBZ-1413") @FixFor("DBZ-1413")
public void shouldStreamValuesForAliasLikeBaseTypes() throws Exception { public void shouldStreamValuesForAliasLikeBaseTypes() throws Exception {