From 88521371cc4827eed5de68bb1c4695fb8a3665ae Mon Sep 17 00:00:00 2001 From: Ilia Bogdanov Date: Mon, 14 Jan 2019 12:00:24 +0300 Subject: [PATCH] DBZ-1029 insertion test update, code cleanup after review --- .../postgresql/RecordsStreamProducer.java | 2 +- .../postgresql/RecordsStreamProducerIT.java | 31 +++++++++---------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java index 40552a288..207bc33ae 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java @@ -388,7 +388,7 @@ protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Blocki Object key = tableSchema.keyFromColumnData(oldRowData); Struct value = tableSchema.valueFromColumnData(oldRowData); if (value == null) { - logger.warn("ignoring message from delete transaction for table '{}' because it does not have a primary key defined and replica identity for the table is not FULL", tableId); + logger.warn("ignoring delete message for table '{}' because it does not have a primary key defined and replica identity for the table is not FULL", tableId); return; } Schema keySchema = tableSchema.keySchema(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index f396ba263..2a7ba9dde 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -219,34 +219,31 @@ public void shouldReceiveChangesForInsertsWithArrayTypes() throws Exception { } @Test - public void shouldReceiveChangesForInsertsDependingOnReplicaIdentity() throws Exception { + @FixFor("DBZ-1029") + public void shouldReceiveChangesForInsertsIndependentOfReplicaIdentity() { // insert statement should not be affected by replica identity settings in any way consumer = testConsumer(1); recordsProducer.start(consumer, blackHole); - String statement = "ALTER TABLE test_table REPLICA IDENTITY DEFAULT;" + - "INSERT INTO test_table (text) VALUES ('pk_and_default');"; - executeAndWait(statement); - assertRecordInserted("public.test_table", PK_FIELD, 2); + TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;"); + String statement = "INSERT INTO test_table (text) VALUES ('pk_and_default');"; + assertInsert(statement, 2, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_default"))); consumer.expects(1); - statement = "ALTER TABLE test_table REPLICA IDENTITY FULL;" + - "INSERT INTO test_table (text) VALUES ('pk_and_full');"; - executeAndWait(statement); - assertRecordInserted("public.test_table", PK_FIELD, 3); + TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL;"); + statement = "INSERT INTO test_table (text) VALUES ('pk_and_full');"; + assertInsert(statement, 3, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_full"))); consumer.expects(1); - statement = "ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;" + - "INSERT INTO test_table (pk, text) VALUES (4, 'no_pk_and_full');"; - executeAndWait(statement); - assertRecordInserted("public.test_table", PK_FIELD, 4); + TestHelper.execute("ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;"); + statement = "INSERT INTO test_table (pk, text) VALUES (4, 'no_pk_and_full');"; + assertInsert(statement, 4, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_full"))); consumer.expects(1); - statement = "ALTER TABLE test_table REPLICA IDENTITY DEFAULT;" + - "INSERT INTO test_table (pk, text) VALUES (5, 'no_pk_and_default');"; - executeAndWait(statement); - assertRecordInserted("public.test_table", PK_FIELD, 5); + TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;"); + statement = "INSERT INTO test_table (pk, text) VALUES (5, 'no_pk_and_default');"; + assertInsert(statement, 5, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_default"))); } @Test