diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java index e2ec51b3c..14c59ae93 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java @@ -46,6 +46,7 @@ import io.debezium.util.Strings; import io.debezium.util.Threads; + /** * Producer of {@link org.apache.kafka.connect.source.SourceRecord source records} from a database snapshot. Once completed, * this producer can optionally continue streaming records, using another {@link RecordsStreamProducer} instance. @@ -402,8 +403,7 @@ protected void generateReadRecord(TableId tableId, Object[] rowData) { Object key = tableSchema.keyFromColumnData(rowData); Struct value = tableSchema.valueFromColumnData(rowData); - //note this is different than implementation in Streams producer. See DBZ-1163 - if (key == null || value == null) { + if (value == null) { logger.trace("key: {}; value: {}; One is null", key, value); return; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index 637e348de..ba3ac151b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -24,6 +25,7 @@ import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter; import io.debezium.connector.postgresql.spi.Snapshotter; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; @@ -476,4 +478,38 @@ public void shouldGenerateSnapshotsForHstores() throws Exception { consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName)); } + @Test + @FixFor("DBZ-1163") + public void shouldGenerateSnapshotForATableWithoutPrimaryKey() throws Exception { + final PostgresConnectorConfig config = new PostgresConnectorConfig( + TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL) + .build() + ); + context = new PostgresTaskContext( + config, + TestHelper.getSchema(config), + PostgresTopicSelector.create(config) + ); + snapshotProducer = buildNoStreamProducer(context, config); + + final TestConsumer consumer = testConsumer(1, "public"); + + TestHelper.execute("insert into table_without_pk values(1, 1000)"); + + //then start the producer and validate all records are there + snapshotProducer.start(consumer, e -> {}); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + List schemaAndValueFields = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), + new SchemaAndValueField("val", Schema.OPTIONAL_INT32_SCHEMA, 1000)); + + consumer.process(record -> { + String actualTopicName = record.topic().replace(TestHelper.TEST_SERVER + ".", ""); + assertEquals("public.table_without_pk", actualTopicName); + assertRecordSchemaAndValues(schemaAndValueFields, record, Envelope.FieldName.AFTER); + }); + } + } diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 18aa1f9a3..acda448d9 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -33,7 +33,7 @@ CREATE TABLE hstore_table_mul (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_with_null (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_with_special (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE not_null_table (pk serial, val numeric(20,8), created_at timestamp not null, created_at_tz timestamptz not null, ctime time not null, ctime_tz timetz not null, cdate date not null, cmoney money not null, cbits bit(3) not null, PRIMARY KEY(pk)); - +CREATE TABLE table_without_pk(id serial, val INTEGER); DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE; CREATE SCHEMA "Quoted_"" . Schema"; -- GRANT ALL ON ALL TABLES IN SCHEMA "Quoted_Schema" TO postgres;