DBZ-7193 Added testcases for tables toasted array columns with replica identity FULL.

This commit is contained in:
Prabhu19 2023-11-30 10:35:24 +01:00 committed by Jiri Pechanec
parent 6866c57ad1
commit 8b0487fb5c
2 changed files with 379 additions and 3 deletions

View File

@ -5,9 +5,10 @@
*/ */
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import java.util.HashSet;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
import io.debezium.connector.postgresql.connection.AbstractReplicationMessageColumn; import io.debezium.connector.postgresql.connection.AbstractReplicationMessageColumn;
/** /**
@ -41,8 +42,7 @@ public class UnchangedToastedReplicationMessageColumn extends AbstractReplicatio
UnchangedToastedReplicationMessageColumn.UNCHANGED_INT_ARRAY_TOAST_VALUE, UnchangedToastedReplicationMessageColumn.UNCHANGED_INT_ARRAY_TOAST_VALUE,
UnchangedToastedReplicationMessageColumn.UNCHANGED_BIGINT_ARRAY_TOAST_VALUE, UnchangedToastedReplicationMessageColumn.UNCHANGED_BIGINT_ARRAY_TOAST_VALUE,
UnchangedToastedReplicationMessageColumn.UNCHANGED_HSTORE_TOAST_VALUE, UnchangedToastedReplicationMessageColumn.UNCHANGED_HSTORE_TOAST_VALUE,
UnchangedToastedReplicationMessageColumn.UNCHANGED_UUID_TOAST_VALUE UnchangedToastedReplicationMessageColumn.UNCHANGED_UUID_TOAST_VALUE));
));
private Object unchangedToastValue; private Object unchangedToastValue;
public UnchangedToastedReplicationMessageColumn(String columnName, PostgresType type, String typeWithModifiers, boolean optional) { public UnchangedToastedReplicationMessageColumn(String columnName, PostgresType type, String typeWithModifiers, boolean optional) {

View File

@ -2017,6 +2017,382 @@ public void shouldHandleToastedUuidArrayColumn() throws Exception {
Envelope.FieldName.AFTER); Envelope.FieldName.AFTER);
} }
@Test
@FixFor("DBZ-7193")
public void shouldHandleToastedArrayColumnForReplicaIdentityFullTable() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY);");
startConnector(Function.identity(), false);
assertConnectorIsRunning();
final String toastedValue = RandomStringUtils.randomAlphanumeric(10000);
// INSERT
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN mandatory_text_array TEXT[] NOT NULL;"
+ "ALTER TABLE test_toast_table ALTER COLUMN mandatory_text_array SET STORAGE EXTENDED;"
+ "ALTER TABLE test_toast_table REPLICA IDENTITY FULL;"
+ "INSERT INTO test_toast_table (not_toast, mandatory_text_array) values (10, ARRAY ['" + toastedValue + "']);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("mandatory_text_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build(), Arrays.asList(toastedValue))),
consumer.remove(),
Envelope.FieldName.AFTER);
// UPDATE
statement = "UPDATE test_toast_table SET not_toast = 20;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "not_toast", "mandatory_text_array"), tbl.retrieveColumnNames());
});
});
SourceRecord updatedRecord = consumer.remove();
// before and after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("mandatory_text_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build(), Arrays.asList(toastedValue))),
updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("mandatory_text_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build(), Arrays.asList(toastedValue))),
updatedRecord, Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-7193")
public void shouldHandleToastedArrayColumnCharacterVaryingForReplicaIdentityFullTable() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY);");
startConnector(Function.identity(), false);
assertConnectorIsRunning();
final String toastedValue = RandomStringUtils.randomAlphanumeric(10000);
// INSERT
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN mandatory_text_array character varying(20000)[] NOT NULL;"
+ "ALTER TABLE test_toast_table ALTER COLUMN mandatory_text_array SET STORAGE EXTENDED;"
+ "ALTER TABLE test_toast_table REPLICA IDENTITY FULL;"
+ "INSERT INTO test_toast_table (not_toast, mandatory_text_array) values (10, ARRAY ['" + toastedValue + "']);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("mandatory_text_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build(), Arrays.asList(toastedValue))),
consumer.remove(),
Envelope.FieldName.AFTER);
// UPDATE
statement = "UPDATE test_toast_table SET not_toast = 20;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "not_toast", "mandatory_text_array"), tbl.retrieveColumnNames());
});
});
SourceRecord updatedRecord = consumer.remove();
// before and after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("mandatory_text_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build(), Arrays.asList(toastedValue))),
updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("mandatory_text_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build(), Arrays.asList(toastedValue))),
updatedRecord, Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-7193")
public void shouldHandleToastedDateArrayColumnForReplicaIdentityFullTable() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY);");
startConnector(Function.identity(), false);
assertConnectorIsRunning();
List<Integer> intList = IntStream.range(1, 100000).boxed().map((x) -> 19338).collect(Collectors.toList());
final String toastedValue = intList.stream().map((x) -> "'2022-12-12'::date").collect(Collectors.joining(","));
// INSERT
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN date_array date[];"
+ "ALTER TABLE test_toast_table ALTER COLUMN date_array SET STORAGE EXTENDED;"
+ "ALTER TABLE test_toast_table REPLICA IDENTITY FULL;"
+ "INSERT INTO test_toast_table (not_toast, date_array) values (10, ARRAY [" + toastedValue + "]);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("date_array",
SchemaBuilder.array(SchemaBuilder.int32().name("io.debezium.time.Date").optional().version(1).build()).optional().build(),
intList)),
consumer.remove(),
Envelope.FieldName.AFTER);
// UPDATE
statement = "UPDATE test_toast_table SET not_toast = 20;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "not_toast", "date_array"), tbl.retrieveColumnNames());
});
});
SourceRecord updatedRecord = consumer.remove();
// before and after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("date_array",
SchemaBuilder.array(SchemaBuilder.int32().name("io.debezium.time.Date").optional().version(1).build()).optional().build(),
intList)),
updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("date_array",
SchemaBuilder.array(SchemaBuilder.int32().name("io.debezium.time.Date").optional().version(1).build()).optional().build(),
intList)),
updatedRecord, Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-7193")
public void shouldHandleToastedByteArrayColumnForReplicaIdentityFullTable() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY);");
startConnector(Function.identity(), false);
assertConnectorIsRunning();
List<Integer> intList = IntStream.range(1, 100000).boxed().map((x) -> 19338).collect(Collectors.toList());
final String toastedValue = RandomStringUtils.randomNumeric(10000);
// INSERT
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN bytea_array bytea[];"
+ "ALTER TABLE test_toast_table ALTER COLUMN bytea_array SET STORAGE EXTENDED;"
+ "ALTER TABLE test_toast_table REPLICA IDENTITY FULL;"
+ "INSERT INTO test_toast_table (not_toast, bytea_array) values (10, ARRAY ['" + toastedValue + "'::bytea]);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("bytea_array",
SchemaBuilder.array(Schema.OPTIONAL_BYTES_SCHEMA).optional().build(), Arrays.asList(ByteBuffer.wrap(toastedValue.getBytes())))),
consumer.remove(),
Envelope.FieldName.AFTER);
// UPDATE
statement = "UPDATE test_toast_table SET not_toast = 20;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "not_toast", "bytea_array"), tbl.retrieveColumnNames());
});
});
SourceRecord updatedRecord = consumer.remove();
// before and after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("bytea_array",
SchemaBuilder.array(Schema.OPTIONAL_BYTES_SCHEMA).optional().build(),
Arrays.asList(ByteBuffer.wrap(toastedValue.getBytes())))),
updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("bytea_array",
SchemaBuilder.array(Schema.OPTIONAL_BYTES_SCHEMA).optional().build(),
Arrays.asList(ByteBuffer.wrap(toastedValue.getBytes())))),
updatedRecord, Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-7193")
public void shouldHandleToastedIntegerArrayColumnForReplicaIdentityFullTable() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY);");
startConnector(Function.identity(), false);
assertConnectorIsRunning();
List<Integer> intList = IntStream.range(1, 10000).boxed().collect(Collectors.toList());
final String toastedValue = intList.stream().map(String::valueOf)
.collect(Collectors.joining(","));
// INSERT
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN int_array int[];"
+ "ALTER TABLE test_toast_table ALTER COLUMN int_array SET STORAGE EXTENDED;"
+ "ALTER TABLE test_toast_table REPLICA IDENTITY FULL;"
+ "INSERT INTO test_toast_table (not_toast, int_array) values (10, ARRAY [" + toastedValue + "]);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("int_array", SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build(), intList)),
consumer.remove(),
Envelope.FieldName.AFTER);
// UPDATE
statement = "UPDATE test_toast_table SET not_toast = 20;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "not_toast", "int_array"), tbl.retrieveColumnNames());
});
});
SourceRecord updatedRecord = consumer.remove();
// before and after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("int_array", SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build(), intList)),
updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("int_array", SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build(), intList)),
updatedRecord, Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-7193")
public void shouldHandleToastedBigIntArrayColumnForReplicaIdentityFullTable() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY);");
startConnector(Function.identity(), false);
assertConnectorIsRunning();
List<Long> bigintList = LongStream.range(1, 10000).boxed().collect(Collectors.toList());
final String toastedValue = bigintList.stream().map(String::valueOf)
.collect(Collectors.joining(","));
// INSERT
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN bigint_array bigint[];"
+ "ALTER TABLE test_toast_table ALTER COLUMN bigint_array SET STORAGE EXTENDED;"
+ "ALTER TABLE test_toast_table REPLICA IDENTITY FULL;"
+ "INSERT INTO test_toast_table (not_toast, bigint_array) values (10, ARRAY [" + toastedValue + "]);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("bigint_array", SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build(), bigintList)),
consumer.remove(),
Envelope.FieldName.AFTER);
// UPDATE
statement = "UPDATE test_toast_table SET not_toast = 20;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "not_toast", "bigint_array"), tbl.retrieveColumnNames());
});
});
SourceRecord updatedRecord = consumer.remove();
// before and after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("bigint_array", SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build(), bigintList)),
updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("bigint_array", SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build(), bigintList)),
updatedRecord, Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-7193")
public void shouldHandleToastedUuidArrayColumnForReplicaIdentityFullTable() throws Exception {
TestHelper.execute(
"DROP TABLE IF EXISTS test_toast_table;",
"CREATE TABLE test_toast_table (id SERIAL PRIMARY KEY);");
startConnector(Function.identity(), false);
assertConnectorIsRunning();
final List<String> toastedValueList = Stream.generate(UUID::randomUUID).map(String::valueOf).limit(10000).collect(Collectors.toList());
final String[] toastedValueArray = toastedValueList.toArray(new String[toastedValueList.size()]);
final String toastedValueQuotedString = toastedValueList.stream().map(uuid_str -> ("'" + uuid_str + "'")).collect(Collectors.joining(","));
// INSERT
String statement = "ALTER TABLE test_toast_table ADD COLUMN not_toast integer;"
+ "ALTER TABLE test_toast_table ADD COLUMN uuid_array uuid[];"
+ "ALTER TABLE test_toast_table ALTER COLUMN uuid_array SET STORAGE EXTENDED;"
+ "ALTER TABLE test_toast_table REPLICA IDENTITY FULL;"
+ "INSERT INTO test_toast_table (not_toast, uuid_array) "
+ "VALUES (10, ARRAY [" + toastedValueQuotedString + "]::uuid[]);";
consumer = testConsumer(1);
executeAndWait(statement);
// after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("uuid_array",
SchemaBuilder.array(io.debezium.data.Uuid.builder().optional().build()).optional().build(),
Arrays.asList(toastedValueArray))),
consumer.remove(),
Envelope.FieldName.AFTER);
// UPDATE
statement = "UPDATE test_toast_table SET not_toast = 20;";
consumer.expects(1);
executeAndWait(statement);
consumer.process(record -> {
assertWithTask(task -> {
Table tbl = ((PostgresConnectorTask) task).getTaskContext().schema().tableFor(TableId.parse("public.test_toast_table", false));
assertEquals(Arrays.asList("id", "not_toast", "uuid_array"), tbl.retrieveColumnNames());
});
});
SourceRecord updatedRecord = consumer.remove();
// before and after record should contain the toasted value
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("uuid_array",
SchemaBuilder.array(io.debezium.data.Uuid.builder().optional().build()).optional().build(),
Arrays.asList(toastedValueArray))),
updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("uuid_array",
SchemaBuilder.array(io.debezium.data.Uuid.builder().optional().build()).optional().build(),
Arrays.asList(toastedValueArray))),
updatedRecord, Envelope.FieldName.AFTER);
}
@Test @Test
@FixFor("DBZ-1029") @FixFor("DBZ-1029")
public void shouldReceiveChangesForTableWithoutPrimaryKey() throws Exception { public void shouldReceiveChangesForTableWithoutPrimaryKey() throws Exception {