DBZ-6379: Fix handling of toasted hstore

This commit is contained in:
Bertrand Paquet 2023-04-21 16:16:34 +02:00 committed by Jiri Pechanec
parent e6e19fb578
commit 8a224e6588
5 changed files with 51 additions and 5 deletions

View File

@ -57,6 +57,7 @@ Barry LaFond
Bartosz Miedlar
Ben Hardesty
Ben Williams
Bertrand Paquet
Biel Garau Estarellas
Bin Li
Bingqin Zhou

View File

@ -24,6 +24,7 @@ public class UnchangedToastedPlaceholder {
private final Map<Object, Object> placeholderValues = new HashMap<Object, Object>();
private final byte[] toastPlaceholderBinary;
private final String toastPlaceholderString;
private final Map toastPlaceholderHstore = new HashMap<String, String>();
public UnchangedToastedPlaceholder(PostgresConnectorConfig connectorConfig) {
toastPlaceholderBinary = connectorConfig.getUnavailableValuePlaceholder();
@ -39,6 +40,8 @@ public UnchangedToastedPlaceholder(PostgresConnectorConfig connectorConfig) {
}
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_INT_ARRAY_TOAST_VALUE, toastedIntArrayPlaceholder);
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_BIGINT_ARRAY_TOAST_VALUE, toastedLongArrayPlaceholder);
toastPlaceholderHstore.put(toastPlaceholderString, toastPlaceholderString);
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_HSTORE_TOAST_VALUE, toastPlaceholderHstore);
}
public Optional<Object> getValue(Object obj) {

View File

@ -25,6 +25,7 @@ public class UnchangedToastedReplicationMessageColumn extends AbstractReplicatio
public static final Object UNCHANGED_TEXT_ARRAY_TOAST_VALUE = new Object();
public static final Object UNCHANGED_INT_ARRAY_TOAST_VALUE = new Object();
public static final Object UNCHANGED_BIGINT_ARRAY_TOAST_VALUE = new Object();
public static final Object UNCHANGED_HSTORE_TOAST_VALUE = new Object();
private Object unchangedToastValue;
@ -63,6 +64,9 @@ private void setUnchangedToastValue(String typeWithModifiers) {
case "_int8":
unchangedToastValue = UNCHANGED_BIGINT_ARRAY_TOAST_VALUE;
break;
case "hstore":
unchangedToastValue = UNCHANGED_HSTORE_TOAST_VALUE;
break;
default:
unchangedToastValue = UNCHANGED_TOAST_VALUE;
}

View File

@ -25,11 +25,7 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -1768,6 +1764,47 @@ public void shouldHandleToastedJsonArrayColumn() throws Exception {
Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-6379")
public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY, text TEXT);");
startConnector(config -> config.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.MAP));
final String toastedValue = RandomStringUtils.randomAlphanumeric(100000);
String statement = "ALTER TABLE test_toast_table ADD COLUMN col hstore;"
+ "INSERT INTO test_toast_table (id, col) values (10, 'a=>" + toastedValue + "');";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
HashMap colValue = new HashMap();
colValue.put("a", toastedValue);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("col", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA,
SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), colValue)),
consumer.remove(),
Envelope.FieldName.AFTER);
statement = "UPDATE test_toast_table SET text = 'text';";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "text", "col"), tbl.retrieveColumnNames());
});
});
colValue.clear();
colValue.put(DecoderDifferences.optionalToastedValuePlaceholder(), DecoderDifferences.optionalToastedValuePlaceholder());
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text"),
new SchemaAndValueField("col", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA,
SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), colValue)),
consumer.remove(),
Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-1029")
public void shouldReceiveChangesForTableWithoutPrimaryKey() throws Exception {

View File

@ -200,3 +200,4 @@ caicancai,蔡灿材
eizners,Sergey Eizner
tthorn,Thomas Thornton
hiscat,My Lang Pangzi
bpaquet,Bertrand Paquet