DBZ-6720: handle unchanged toasted uuid array

This commit is contained in:
rajdangwal 2023-08-13 23:14:19 +05:30 committed by Jiri Pechanec
parent 64eaf72665
commit 153834c650
4 changed files with 64 additions and 0 deletions

View File

@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
/**
* Helper that returns placeholder values for unchanged toasted columns.
@ -25,10 +26,12 @@ public class UnchangedToastedPlaceholder {
private final byte[] toastPlaceholderBinary;
private final String toastPlaceholderString;
private final Map<String, String> toastPlaceholderHstore = new HashMap<>();
private final String toastPlaceholderUuid;
public UnchangedToastedPlaceholder(PostgresConnectorConfig connectorConfig) {
toastPlaceholderBinary = connectorConfig.getUnavailableValuePlaceholder();
toastPlaceholderString = new String(toastPlaceholderBinary);
toastPlaceholderUuid = UUID.nameUUIDFromBytes(toastPlaceholderBinary).toString();
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_TOAST_VALUE, toastPlaceholderString);
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_TEXT_ARRAY_TOAST_VALUE,
Arrays.asList(toastPlaceholderString));
@ -44,6 +47,7 @@ public UnchangedToastedPlaceholder(PostgresConnectorConfig connectorConfig) {
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_BIGINT_ARRAY_TOAST_VALUE, toastedLongArrayPlaceholder);
toastPlaceholderHstore.put(toastPlaceholderString, toastPlaceholderString);
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_HSTORE_TOAST_VALUE, toastPlaceholderHstore);
placeholderValues.put(UnchangedToastedReplicationMessageColumn.UNCHANGED_UUID_TOAST_VALUE, Arrays.asList(toastPlaceholderUuid));
}
public Optional<Object> getValue(Object obj) {

View File

@ -29,6 +29,7 @@ public class UnchangedToastedReplicationMessageColumn extends AbstractReplicatio
public static final Object UNCHANGED_INT_ARRAY_TOAST_VALUE = new Object();
public static final Object UNCHANGED_BIGINT_ARRAY_TOAST_VALUE = new Object();
public static final Object UNCHANGED_HSTORE_TOAST_VALUE = new Object();
public static final Object UNCHANGED_UUID_TOAST_VALUE = new Object();
private Object unchangedToastValue;
@ -77,6 +78,10 @@ private void setUnchangedToastValue(String typeWithModifiers) {
case "hstore":
unchangedToastValue = UNCHANGED_HSTORE_TOAST_VALUE;
break;
case "uuid[]":
case "_uuid":
unchangedToastValue = UNCHANGED_UUID_TOAST_VALUE;
break;
default:
unchangedToastValue = UNCHANGED_TOAST_VALUE;
}

View File

@ -19,6 +19,7 @@
public class DecoderDifferences {
static final String TOASTED_VALUE_PLACEHOLDER = "__debezium_unavailable_value";
static final String TOASTED_VALUE_NUMBER_STRING = "95, 95, 100, 101, 98, 101, 122, 105, 117, 109, 95, 117, 110, 97, 118, 97, 105, 108, 97, 98, 108, 101, 95, 118, 97, 108, 117, 101";
static final String TOASTED_VALUE_UUID_STRING = "b68a35a7-17ad-35b3-af2a-ae46edb4545a"; // UUID encoding of string `__debezium_unavailable_value`
private static boolean pgoutput() {
return TestHelper.decoderPlugin() == PostgresConnectorConfig.LogicalDecoder.PGOUTPUT;
@ -32,6 +33,10 @@ public static String mandatoryToastedValuePlaceholder() {
return TOASTED_VALUE_PLACEHOLDER;
}
public static String mandatoryToastedValueUuidPlaceholder() {
return TOASTED_VALUE_UUID_STRING;
}
public static byte[] mandatoryToastedValueBinaryPlaceholder() {
return PostgresConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER.defaultValueAsString().getBytes();
}

View File

@ -32,6 +32,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -40,8 +41,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import io.debezium.data.SchemaAndValueField;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
@ -1955,6 +1958,53 @@ public void shouldHandleToastedHstoreInHstoreMapMode() throws Exception {
Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-6720")
public void shouldHandleToastedUuidArrayColumn() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY, text TEXT);");
startConnector(Function.identity(), false);
final List<String> toastedValueList = Stream.generate(UUID::randomUUID).map(String::valueOf).limit(10000).collect(Collectors.toList());
final String[] toastedValueArray = toastedValueList.toArray(new String[toastedValueList.size()]);
final String toastedValueQuotedString = toastedValueList.stream().map(uuid_str -> ("'" + uuid_str + "'")).collect(Collectors.joining(","));
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN uuid_array uuid[];"
+ "ALTER TABLE test_toast_table ALTER COLUMN uuid_array SET STORAGE EXTENDED;"
+ "INSERT INTO test_toast_table (not_toast, text, uuid_array) "
+ "VALUES (10, 'text', ARRAY [" + toastedValueQuotedString + "]::uuid[]);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text"),
new SchemaAndValueField("uuid_array", SchemaBuilder.array(
io.debezium.data.Uuid.builder().optional().build()).optional().build(),
Arrays.asList(toastedValueArray))),
consumer.remove(),
Envelope.FieldName.AFTER);
statement = "UPDATE test_toast_table SET not_toast = 2;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "text", "not_toast", "uuid_array"), tbl.retrieveColumnNames());
});
});
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "text"),
new SchemaAndValueField("uuid_array", SchemaBuilder.array(
io.debezium.data.Uuid.builder().optional().build()).optional().build(),
Arrays.asList(DecoderDifferences.mandatoryToastedValueUuidPlaceholder()))),
consumer.remove(),
Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-1029")
public void shouldReceiveChangesForTableWithoutPrimaryKey() throws Exception {