From 8a224e6588734a9a2ac17cf0a95c06ead9f21376 Mon Sep 17 00:00:00 2001 From: Bertrand Paquet Date: Fri, 21 Apr 2023 16:16:34 +0200 Subject: [PATCH] DBZ-6379: Fix handling of toasted hstore --- COPYRIGHT.txt | 1 + .../UnchangedToastedPlaceholder.java | 3 ++ ...hangedToastedReplicationMessageColumn.java | 4 ++ .../postgresql/RecordsStreamProducerIT.java | 47 +++++++++++++++++-- jenkins-jobs/scripts/config/Aliases.txt | 1 + 5 files changed, 51 insertions(+), 5 deletions(-) diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 61f622df9..76c3dcc21 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -57,6 +57,7 @@ Barry LaFond Bartosz Miedlar Ben Hardesty Ben Williams +Bertrand Paquet Biel Garau Estarellas Bin Li Bingqin Zhou diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedPlaceholder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedPlaceholder.java index 82322d5b5..bfa5d1990 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedPlaceholder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedPlaceholder.java @@ -24,6 +24,7 @@ public class UnchangedToastedPlaceholder { private final Map placeholderValues = new HashMap(); private final byte[] toastPlaceholderBinary; private final String toastPlaceholderString; + private final Map toastPlaceholderHstore = new HashMap(); public UnchangedToastedPlaceholder(PostgresConnectorConfig connectorConfig) { toastPlaceholderBinary = connectorConfig.getUnavailableValuePlaceholder(); @@ -39,6 +40,8 @@ public UnchangedToastedPlaceholder(PostgresConnectorConfig connectorConfig) { } placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_INT_ARRAY_TOAST_VALUE, toastedIntArrayPlaceholder); placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_BIGINT_ARRAY_TOAST_VALUE, toastedLongArrayPlaceholder); + toastPlaceholderHstore.put(toastPlaceholderString, toastPlaceholderString); + placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_HSTORE_TOAST_VALUE, toastPlaceholderHstore); } public Optional getValue(Object obj) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedReplicationMessageColumn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedReplicationMessageColumn.java index 760eea330..1a8116fa9 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedReplicationMessageColumn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/UnchangedToastedReplicationMessageColumn.java @@ -25,6 +25,7 @@ public class UnchangedToastedReplicationMessageColumn extends AbstractReplicatio public static final Object UNCHANGED_TEXT_ARRAY_TOAST_VALUE = new Object(); public static final Object UNCHANGED_INT_ARRAY_TOAST_VALUE = new Object(); public static final Object UNCHANGED_BIGINT_ARRAY_TOAST_VALUE = new Object(); + public static final Object UNCHANGED_HSTORE_TOAST_VALUE = new Object(); private Object unchangedToastValue; @@ -63,6 +64,9 @@ private void setUnchangedToastValue(String typeWithModifiers) { case "_int8": unchangedToastValue = UNCHANGED_BIGINT_ARRAY_TOAST_VALUE; break; + case "hstore": + unchangedToastValue = UNCHANGED_HSTORE_TOAST_VALUE; + break; default: unchangedToastValue = UNCHANGED_TOAST_VALUE; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 3c9272efa..5e19eab05 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -25,11 +25,7 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1768,6 +1764,47 @@ public void shouldHandleToastedJsonArrayColumn() throws Exception { Envelope.FieldName.AFTER); } + @Test + @FixFor("DBZ-6379") + public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception { + TestHelper.execute( + "DROP TABLE IF EXISTS test_toast_table;", + "CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY, text TEXT);"); + startConnector(config -> config.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.MAP)); + final String toastedValue = RandomStringUtils.randomAlphanumeric(100000); + String statement = "ALTER TABLE test_toast_table ADD COLUMN col hstore;" + + "INSERT INTO test_toast_table (id, col) values (10, 'a=>" + toastedValue + "');"; + consumer = testConsumer(1); + executeAndWait(statement); + + // after record should contain the toasted value + HashMap colValue = new HashMap(); + colValue.put("a", toastedValue); + assertRecordSchemaAndValues(Arrays.asList( + new SchemaAndValueField("col", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, + SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), colValue)), + consumer.remove(), + Envelope.FieldName.AFTER); + statement = "UPDATE test_toast_table SET text = 'text';"; + + consumer.expects(1); + executeAndWait(statement); + consumer.process(record -> { + assertWithTask(task -> { + Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false)); + assertEquals(Arrays.asList("id", "text", "col"), tbl.retrieveColumnNames()); + }); + }); + colValue.clear(); + colValue.put(DecoderDifferences.optionalToastedValuePlaceholder(), DecoderDifferences.optionalToastedValuePlaceholder()); + assertRecordSchemaAndValues(Arrays.asList( + new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text"), + new SchemaAndValueField("col", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, + SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), colValue)), + consumer.remove(), + Envelope.FieldName.AFTER); + } + @Test @FixFor("DBZ-1029") public void shouldReceiveChangesForTableWithoutPrimaryKey() throws Exception { diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index 8753127ae..71655cda7 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -200,3 +200,4 @@ caicancai,蔡灿材 eizners,Sergey Eizner tthorn,Thomas Thornton hiscat,My Lang Pangzi +bpaquet,Bertrand Paquet