From 02873e8169372fc44842635b82cfeee40c17a8a2 Mon Sep 17 00:00:00 2001 From: preethi Date: Fri, 29 Mar 2019 00:36:17 +0530 Subject: [PATCH] DBZ-1163 Enable postgres snapshot for tables without primary key --- .../postgresql/RecordsSnapshotProducer.java | 4 +-- .../postgresql/RecordsSnapshotProducerIT.java | 36 +++++++++++++++++++ .../test/resources/postgres_create_tables.ddl | 2 +- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java index e2ec51b3c..14c59ae93 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java @@ -46,6 +46,7 @@ import io.debezium.util.Strings; import io.debezium.util.Threads; + /** * Producer of {@link org.apache.kafka.connect.source.SourceRecord source records} from a database snapshot. Once completed, * this producer can optionally continue streaming records, using another {@link RecordsStreamProducer} instance. @@ -402,8 +403,7 @@ protected void generateReadRecord(TableId tableId, Object[] rowData) { Object key = tableSchema.keyFromColumnData(rowData); Struct value = tableSchema.valueFromColumnData(rowData); - //note this is different than implementation in Streams producer. See DBZ-1163 - if (key == null || value == null) { + if (value == null) { logger.trace("key: {}; value: {}; One is null", key, value); return; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index 637e348de..ba3ac151b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -24,6 +25,7 @@ import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter; import io.debezium.connector.postgresql.spi.Snapshotter; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; @@ -476,4 +478,38 @@ public void shouldGenerateSnapshotsForHstores() throws Exception { consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName)); } + @Test + @FixFor("DBZ-1163") + public void shouldGenerateSnapshotForATableWithoutPrimaryKey() throws Exception { + final PostgresConnectorConfig config = new PostgresConnectorConfig( + TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL) + .build() + ); + context = new PostgresTaskContext( + config, + TestHelper.getSchema(config), + PostgresTopicSelector.create(config) + ); + snapshotProducer = buildNoStreamProducer(context, config); + + final TestConsumer consumer = testConsumer(1, "public"); + + TestHelper.execute("insert into table_without_pk values(1, 1000)"); + + //then start the producer and validate all records are there + snapshotProducer.start(consumer, e -> {}); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + List schemaAndValueFields = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), + new SchemaAndValueField("val", Schema.OPTIONAL_INT32_SCHEMA, 1000)); + + consumer.process(record -> { + String actualTopicName = record.topic().replace(TestHelper.TEST_SERVER + ".", ""); + assertEquals("public.table_without_pk", actualTopicName); + assertRecordSchemaAndValues(schemaAndValueFields, record, Envelope.FieldName.AFTER); + }); + } + } diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 18aa1f9a3..acda448d9 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -33,7 +33,7 @@ CREATE TABLE hstore_table_mul (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_with_null (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_with_special (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE not_null_table (pk serial, val numeric(20,8), created_at timestamp not null, created_at_tz timestamptz not null, ctime time not null, ctime_tz timetz not null, cdate date not null, cmoney money not null, cbits bit(3) not null, PRIMARY KEY(pk)); - +CREATE TABLE table_without_pk(id serial, val INTEGER); DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE; CREATE SCHEMA "Quoted_"" . Schema"; -- GRANT ALL ON ALL TABLES IN SCHEMA "Quoted_Schema" TO postgres;