DBZ-1029 insertion test update, code cleanup after review

This commit is contained in:
Ilia Bogdanov 2019-01-14 12:00:24 +03:00
parent cee9bd9a5d
commit 88521371cc
2 changed files with 15 additions and 18 deletions

View File

@ -388,7 +388,7 @@ protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Blocki
Object key = tableSchema.keyFromColumnData(oldRowData);
Struct value = tableSchema.valueFromColumnData(oldRowData);
if (value == null) {
logger.warn("ignoring message from delete transaction for table '{}' because it does not have a primary key defined and replica identity for the table is not FULL", tableId);
logger.warn("ignoring delete message for table '{}' because it does not have a primary key defined and replica identity for the table is not FULL", tableId);
return;
}
Schema keySchema = tableSchema.keySchema();

View File

@ -219,34 +219,31 @@ public void shouldReceiveChangesForInsertsWithArrayTypes() throws Exception {
}
@Test
public void shouldReceiveChangesForInsertsDependingOnReplicaIdentity() throws Exception {
@FixFor("DBZ-1029")
public void shouldReceiveChangesForInsertsIndependentOfReplicaIdentity() {
// insert statement should not be affected by replica identity settings in any way
consumer = testConsumer(1);
recordsProducer.start(consumer, blackHole);
String statement = "ALTER TABLE test_table REPLICA IDENTITY DEFAULT;" +
"INSERT INTO test_table (text) VALUES ('pk_and_default');";
executeAndWait(statement);
assertRecordInserted("public.test_table", PK_FIELD, 2);
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;");
String statement = "INSERT INTO test_table (text) VALUES ('pk_and_default');";
assertInsert(statement, 2, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_default")));
consumer.expects(1);
statement = "ALTER TABLE test_table REPLICA IDENTITY FULL;" +
"INSERT INTO test_table (text) VALUES ('pk_and_full');";
executeAndWait(statement);
assertRecordInserted("public.test_table", PK_FIELD, 3);
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL;");
statement = "INSERT INTO test_table (text) VALUES ('pk_and_full');";
assertInsert(statement, 3, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_full")));
consumer.expects(1);
statement = "ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;" +
"INSERT INTO test_table (pk, text) VALUES (4, 'no_pk_and_full');";
executeAndWait(statement);
assertRecordInserted("public.test_table", PK_FIELD, 4);
TestHelper.execute("ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;");
statement = "INSERT INTO test_table (pk, text) VALUES (4, 'no_pk_and_full');";
assertInsert(statement, 4, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_full")));
consumer.expects(1);
statement = "ALTER TABLE test_table REPLICA IDENTITY DEFAULT;" +
"INSERT INTO test_table (pk, text) VALUES (5, 'no_pk_and_default');";
executeAndWait(statement);
assertRecordInserted("public.test_table", PK_FIELD, 5);
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;");
statement = "INSERT INTO test_table (pk, text) VALUES (5, 'no_pk_and_default');";
assertInsert(statement, 5, Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_default")));
}
@Test