DBZ-1163 Enable postgres snapshot for tables without primary key

This commit is contained in:
preethi 2019-03-29 00:36:17 +05:30 committed by Gunnar Morling
parent 7742e66e11
commit 02873e8169
3 changed files with 39 additions and 3 deletions

View File

@ -46,6 +46,7 @@
import io.debezium.util.Strings;
import io.debezium.util.Threads;
/**
* Producer of {@link org.apache.kafka.connect.source.SourceRecord source records} from a database snapshot. Once completed,
* this producer can optionally continue streaming records, using another {@link RecordsStreamProducer} instance.
@ -402,8 +403,7 @@ protected void generateReadRecord(TableId tableId, Object[] rowData) {
Object key = tableSchema.keyFromColumnData(rowData);
Struct value = tableSchema.valueFromColumnData(rowData);
//note this is different than implementation in Streams producer. See DBZ-1163
if (key == null || value == null) {
if (value == null) {
logger.trace("key: {}; value: {}; One is null", key, value);
return;
}

View File

@ -13,6 +13,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -24,6 +25,7 @@
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.spi.Snapshotter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
@ -476,4 +478,38 @@ public void shouldGenerateSnapshotsForHstores() throws Exception {
consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName));
}
@Test
@FixFor("DBZ-1163")
public void shouldGenerateSnapshotForATableWithoutPrimaryKey() throws Exception {
final PostgresConnectorConfig config = new PostgresConnectorConfig(
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL)
.build()
);
context = new PostgresTaskContext(
config,
TestHelper.getSchema(config),
PostgresTopicSelector.create(config)
);
snapshotProducer = buildNoStreamProducer(context, config);
final TestConsumer consumer = testConsumer(1, "public");
TestHelper.execute("insert into table_without_pk values(1, 1000)");
//then start the producer and validate all records are there
snapshotProducer.start(consumer, e -> {});
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
List<SchemaAndValueField> schemaAndValueFields = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1),
new SchemaAndValueField("val", Schema.OPTIONAL_INT32_SCHEMA, 1000));
consumer.process(record -> {
String actualTopicName = record.topic().replace(TestHelper.TEST_SERVER + ".", "");
assertEquals("public.table_without_pk", actualTopicName);
assertRecordSchemaAndValues(schemaAndValueFields, record, Envelope.FieldName.AFTER);
});
}
}

View File

@ -33,7 +33,7 @@ CREATE TABLE hstore_table_mul (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE hstore_table_with_null (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE hstore_table_with_special (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE not_null_table (pk serial, val numeric(20,8), created_at timestamp not null, created_at_tz timestamptz not null, ctime time not null, ctime_tz timetz not null, cdate date not null, cmoney money not null, cbits bit(3) not null, PRIMARY KEY(pk));
CREATE TABLE table_without_pk(id serial, val INTEGER);
DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE;
CREATE SCHEMA "Quoted_"" . Schema";
-- GRANT ALL ON ALL TABLES IN SCHEMA "Quoted_Schema" TO postgres;