DBZ-280 Expanding test

This commit is contained in:
Gunnar Morling 2017-06-08 22:17:29 +02:00
parent 616ba945bf
commit aa4f102ff0
3 changed files with 68 additions and 58 deletions

View File

@ -57,13 +57,13 @@
/**
* Base class for the integration tests for the different {@link RecordsProducer} instances
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public abstract class AbstractRecordsProducerTest {
protected static final Pattern INSERT_TABLE_MATCHING_PATTERN = Pattern.compile("insert into (\\w+).+", Pattern.CASE_INSENSITIVE);
protected static final String INSERT_CASH_TYPES_STMT = "INSERT INTO cash_table (csh) VALUES ('$1234.11')";
protected static final String INSERT_DATE_TIME_TYPES_STMT = "INSERT INTO time_table(ts, tz, date, ti, ttz, it) " +
"VALUES ('2016-11-04T13:51:30'::TIMESTAMP, '2016-11-04T13:51:30+02:00'::TIMESTAMPTZ, " +
@ -79,13 +79,13 @@ public abstract class AbstractRecordsProducerTest {
protected static final String INSERT_NUMERIC_TYPES_STMT = "INSERT INTO numeric_table (si, i, bi, d, n, r, db, ss, bs, b) " +
"VALUES (1, 123456, 1234567890123, 1.1, 22.22, 3.3, 4.44, 1, 123, true)";
protected static final String INSERT_TSTZRANGE_TYPES_STMT = "INSERT INTO tstzrange_table (t) " +
"VALUES ('[2017-06-05 11:29:12.549426+00,)')";
protected static final String INSERT_TSTZRANGE_TYPES_STMT = "INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) " +
"VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')";
protected static final Set<String> ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_DATE_TIME_TYPES_STMT,
INSERT_BIN_TYPES_STMT, INSERT_GEOM_TYPES_STMT, INSERT_TEXT_TYPES_STMT,
INSERT_CASH_TYPES_STMT, INSERT_STRING_TYPES_STMT));
protected List<SchemaAndValueField> schemasAndValuesForNumericType() {
return Arrays.asList(new SchemaAndValueField("si", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 1),
new SchemaAndValueField("i", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 123456),
@ -98,7 +98,7 @@ protected List<SchemaAndValueField> schemasAndValuesForNumericType() {
new SchemaAndValueField("bs", Schema.INT64_SCHEMA, 123L),
new SchemaAndValueField("b", Schema.OPTIONAL_BOOLEAN_SCHEMA, Boolean.TRUE));
}
protected List<SchemaAndValueField> schemasAndValuesForStringTypes() {
return Arrays.asList(new SchemaAndValueField("vc", Schema.OPTIONAL_STRING_SCHEMA, "aa"),
new SchemaAndValueField("vcv", Schema.OPTIONAL_STRING_SCHEMA, "bb"),
@ -106,14 +106,14 @@ protected List<SchemaAndValueField> schemasAndValuesForStringTypes() {
new SchemaAndValueField("c", Schema.OPTIONAL_STRING_SCHEMA, "abc"),
new SchemaAndValueField("t", Schema.OPTIONAL_STRING_SCHEMA, "some text"));
}
protected List<SchemaAndValueField> schemasAndValuesForTextTypes() {
return Arrays.asList(new SchemaAndValueField("j", Json.builder().optional().build(), "{\"bar\": \"baz\"}"),
new SchemaAndValueField("jb", Json.builder().optional().build(), "{\"bar\": \"baz\"}"),
new SchemaAndValueField("x", Xml.builder().optional().build(), "<foo>bar</foo><foo>bar</foo>"),
new SchemaAndValueField("u", Uuid.builder().optional().build(), "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"));
}
protected List<SchemaAndValueField> schemaAndValuesForGeomTypes() {
Schema pointSchema = Point.builder().optional().build();
return Collections.singletonList(new SchemaAndValueField("p", pointSchema, Point.createValue(pointSchema, 1, 1)));
@ -121,21 +121,29 @@ protected List<SchemaAndValueField> schemaAndValuesForGeomTypes() {
protected List<SchemaAndValueField> schemaAndValuesForTstzRangeTypes() {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSx");
Instant instant = dateTimeFormatter.parse("2017-06-05 11:29:12.549426+00", Instant::from);
Instant begin = dateTimeFormatter.parse("2017-06-05 11:29:12.549426+00", Instant::from);
Instant end = dateTimeFormatter.parse("2017-06-05 12:34:56.789012+00", Instant::from);
// Acknowledge timezone expectation of the system running the test
String systemTime = dateTimeFormatter.withZone(ZoneId.systemDefault()).format(instant);
String expectedField = String.format("[\"%s\",)",systemTime);
return Collections.singletonList(new SchemaAndValueField("t", Schema.OPTIONAL_STRING_SCHEMA, expectedField));
String beginSystemTime = dateTimeFormatter.withZone(ZoneId.systemDefault()).format(begin);
String endSystemTime = dateTimeFormatter.withZone(ZoneId.systemDefault()).format(end);
String expectedField1 = String.format("[\"%s\",)", beginSystemTime);
String expectedField2 = String.format("[\"%s\",\"%s\"]", beginSystemTime, endSystemTime);
return Arrays.asList(
new SchemaAndValueField("unbounded_exclusive_range", Schema.OPTIONAL_STRING_SCHEMA, expectedField1),
new SchemaAndValueField("bounded_inclusive_range", Schema.OPTIONAL_STRING_SCHEMA, expectedField2)
);
}
protected List<SchemaAndValueField> schemaAndValuesForBinTypes() {
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap(new byte[]{ 1, 2, 3})),
new SchemaAndValueField("bol", Schema.OPTIONAL_BOOLEAN_SCHEMA, false),
new SchemaAndValueField("bs", Bits.builder(2).optional().build(), new byte[] { 3, 0 }), // bitsets get converted from two's complement
new SchemaAndValueField("bv", Bits.builder(2).optional().build(), new byte[] { 0, 0 }));
}
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
long expectedTs = NanoTimestamp.toEpochNanos(LocalDateTime.parse("2016-11-04T13:51:30"), null);
String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC
@ -143,7 +151,7 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay();
String expectedTtz = "11:51:30Z"; //time is stored with TZ, should be read back at GMT
double interval = MicroDuration.durationMicros(1, 2, 3, 4, 5, 0, PostgresValueConverter.DAYS_PER_MONTH_AVG);
return Arrays.asList(new SchemaAndValueField("ts", NanoTimestamp.builder().optional().build(), expectedTs),
new SchemaAndValueField("tz", ZonedTimestamp.builder().optional().build(), expectedTz),
new SchemaAndValueField("date", Date.builder().optional().build(), expectedDate),
@ -151,17 +159,17 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
new SchemaAndValueField("ttz", ZonedTime.builder().optional().build(), expectedTtz),
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval));
}
protected List<SchemaAndValueField> schemaAndValuesForMoneyTypes() {
return Collections.singletonList(new SchemaAndValueField("csh", Decimal.builder(0).optional().build(),
BigDecimal.valueOf(1234.11d)));
}
protected Map<String, List<SchemaAndValueField>> schemaAndValuesByTableName() {
return ALL_STMTS.stream().collect(Collectors.toMap(AbstractRecordsProducerTest::tableNameFromInsertStmt,
this::schemasAndValuesForTable));
}
protected List<SchemaAndValueField> schemasAndValuesForTable(String insertTableStatement) {
switch (insertTableStatement) {
case INSERT_NUMERIC_TYPES_STMT:
@ -182,7 +190,7 @@ protected List<SchemaAndValueField> schemasAndValuesForTable(String insertTableS
throw new IllegalArgumentException("unknown statement:" + insertTableStatement);
}
}
protected void assertRecordSchemaAndValues(List<SchemaAndValueField> expectedSchemaAndValuesByColumn,
SourceRecord record,
String envelopeFieldName) {
@ -190,7 +198,7 @@ protected void assertRecordSchemaAndValues(List<SchemaAndValueField> expectedSch
assertNotNull("expected there to be content in Envelope under " + envelopeFieldName, content);
expectedSchemaAndValuesByColumn.forEach(schemaAndValueField -> schemaAndValueField.assertFor(content));
}
protected void assertRecordOffset(SourceRecord record, boolean expectSnapshot, boolean expectedLastSnapshotRecord) {
Map<String, ?> offset = record.sourceOffset();
assertNotNull(offset.get(SourceInfo.TXID_KEY));
@ -206,29 +214,29 @@ protected void assertRecordOffset(SourceRecord record, boolean expectSnapshot, b
assertNull("Last snapshot marker not expected, but found", lastSnapshotRecord);
}
}
protected static String tableNameFromInsertStmt(String statement) {
Matcher matcher = INSERT_TABLE_MATCHING_PATTERN.matcher(statement);
assertTrue("invalid statement: " + statement, matcher.matches());
return matcher.group(1);
}
protected static class SchemaAndValueField {
private final Object schema;
private final Object value;
private final String fieldName;
public SchemaAndValueField(String fieldName, Object schema, Object value) {
this.schema = schema;
this.value = value;
this.fieldName = fieldName;
}
protected void assertFor(Struct content) {
assertSchema(content);
assertValue(content);
}
private void assertValue(Struct content) {
if (value == null) {
assertNull(fieldName + " is present in the actual content", content.get(fieldName));
@ -243,7 +251,7 @@ private void assertValue(Struct content) {
assertEquals("Values don't match for " + fieldName, value, actualValue);
}
}
private void assertSchema(Struct content) {
if (schema == null) {
return;
@ -254,20 +262,20 @@ private void assertSchema(Struct content) {
assertEquals("Schema for " + field + " does not match the actual value", this.schema, field.schema());
}
}
protected TestConsumer testConsumer(int expectedRecordsCount) {
return new TestConsumer(expectedRecordsCount);
}
protected static class TestConsumer implements Consumer<SourceRecord> {
private ConcurrentLinkedQueue<SourceRecord> records;
private VariableLatch latch;
protected TestConsumer(int expectedRecordsCount) {
this.latch = new VariableLatch(expectedRecordsCount);
this.records = new ConcurrentLinkedQueue<>();
}
@Override
public void accept(SourceRecord record) {
if (latch.getCount() == 0) {
@ -276,28 +284,28 @@ public void accept(SourceRecord record) {
records.add(record);
latch.countDown();
}
protected void expects(int expectedRecordsCount) {
assert latch.getCount() == 0;
this.latch.countUp(expectedRecordsCount);
}
protected SourceRecord remove() {
return records.remove();
}
protected boolean isEmpty() {
return records.isEmpty();
}
protected void process(Consumer<SourceRecord> consumer) {
records.forEach(consumer);
}
protected void clear() {
records.clear();
}
protected void await(long timeout, TimeUnit unit) throws InterruptedException {
if (!latch.await(timeout, unit)) {
fail("Consumer expected " + latch.getCount() + " records, but received " + records.size());

View File

@ -15,6 +15,7 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -43,18 +44,18 @@
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class PostgresSchemaIT {
private static final String[] TEST_TABLES = new String[] { "public.numeric_table", "public.string_table", "public.cash_table",
"public.bitbin_table",
"public.time_table", "public.text_table", "public.geom_table", "public.tstzrange_table" };
private PostgresSchema schema;
@Before
public void before() throws SQLException {
TestHelper.dropAllSchemas();
TestHelper.dropAllSchemas();
}
@Test
public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl");
@ -82,10 +83,11 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception {
Json.builder().optional().build(), Json.builder().optional().build(), Xml.builder().optional().build(),
Uuid.builder().optional().build());
assertTableSchema("public.geom_table", "p", Point.builder().optional().build());
assertTableSchema("public.tstzrange_table", "t", Schema.OPTIONAL_STRING_SCHEMA);
assertTableSchema("public.tstzrange_table", "unbounded_exclusive_range, bounded_inclusive_range",
Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
}
}
@Test
public void shouldApplyFilters() throws Exception {
String statements = "CREATE SCHEMA s1; " +
@ -106,14 +108,14 @@ public void shouldApplyFilters() throws Exception {
assertTablesIncluded("s2.a", "s2.b");
assertTablesExcluded("s1.a", "s1.b");
}
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(SCHEMA_BLACKLIST, "s.*").build());
schema = new PostgresSchema(config);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false);
assertTablesExcluded("s1.a", "s2.a", "s1.b", "s2.b");
}
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A,s2.A").build());
schema = new PostgresSchema(config);
try (PostgresConnection connection = TestHelper.create()) {
@ -121,7 +123,7 @@ public void shouldApplyFilters() throws Exception {
assertTablesIncluded("s1.b", "s2.b");
assertTablesExcluded("s1.a", "s2.a");
}
config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(SCHEMA_BLACKLIST, "s2")
.with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A")
@ -140,7 +142,7 @@ public void shouldApplyFilters() throws Exception {
assertColumnsExcluded("s1.a.aa", "s2.a.aa");
}
}
@Test
public void shouldDetectNewChangesAfterRefreshing() throws Exception {
String statements = "CREATE SCHEMA public;" +
@ -163,11 +165,11 @@ public void shouldDetectNewChangesAfterRefreshing() throws Exception {
assertTablesExcluded("public.table1");
assertTableSchema(tableId, "strcol", Schema.OPTIONAL_STRING_SCHEMA);
}
statements = "ALTER TABLE table2 ADD COLUMN vc VARCHAR(2);" +
"ALTER TABLE table2 ADD COLUMN si SMALLINT;" +
"ALTER TABLE table2 DROP COLUMN strcol;";
TestHelper.execute(statements);
try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, TableId.parse(tableId, false));
@ -178,7 +180,7 @@ public void shouldDetectNewChangesAfterRefreshing() throws Exception {
assertColumnsExcluded(tableId + ".strcol");
}
}
protected void assertKeySchema(String fullyQualifiedTableName, String fields, Schema... types) {
TableSchema tableSchema = schema.schemaFor(fullyQualifiedTableName);
Schema keySchema = tableSchema.keySchema();
@ -190,7 +192,7 @@ protected void assertTableSchema(String fullyQualifiedTableName, String fields,
Schema keySchema = tableSchema.valueSchema();
assertSchemaContent(fields.split(","), types, keySchema);
}
private void assertSchemaContent(String[] fields, Schema[] types, Schema keySchema) {
IntStream.range(0, fields.length).forEach(i -> {
String fieldName = fields[i].trim();
@ -199,7 +201,7 @@ private void assertSchemaContent(String[] fields, Schema[] types, Schema keySche
assertEquals("'" + fieldName + "' has incorrect schema.", types[i], field.schema());
});
}
protected void assertTablesIncluded(String... fullyQualifiedTableNames) {
Arrays.stream(fullyQualifiedTableNames).forEach(fullyQualifiedTableName -> {
TableSchema tableSchema = schema.schemaFor(fullyQualifiedTableName);
@ -208,18 +210,18 @@ protected void assertTablesIncluded(String... fullyQualifiedTableNames) {
assertThat(tableSchema.valueSchema().name()).isEqualTo(validFullName(fullyQualifiedTableName, ".Value"));
});
}
private String validFullName(String proposedName, String suffix) {
return AvroValidator.validFullname(TestHelper.TEST_SERVER + "." + proposedName + suffix);
}
protected void assertTablesExcluded(String... fullyQualifiedTableNames) {
Arrays.stream(fullyQualifiedTableNames).forEach(fullyQualifiedTableName -> {
assertThat(schema.tableFor(fullyQualifiedTableName)).isNull();
assertThat(schema.schemaFor(fullyQualifiedTableName)).isNull();
});
}
protected void assertColumnsExcluded(String...columnNames) {
Arrays.stream(columnNames).forEach(fqColumnName -> {
int lastDotIdx = fqColumnName.lastIndexOf(".");

View File

@ -8,4 +8,4 @@ CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VA
CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tz TIMESTAMPTZ, date DATE, ti TIME, ttz TIME WITH TIME ZONE, it INTERVAL, PRIMARY KEY(pk));
CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk));
CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk));
CREATE TABLE tstzrange_table (pk serial, t tstzrange, PRIMARY KEY(pk));
CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk));