DBZ-1146 Test full column diff

This commit is contained in:
Jiri Pechanec 2019-02-22 13:34:56 +01:00
parent 5043d41b7b
commit 78bfda09a6
2 changed files with 44 additions and 10 deletions

View File

@ -62,4 +62,14 @@ private static boolean wal2Json() {
public static boolean areSpecialFPValuesUnsupported() {
return wal2Json();
}
/**
* wal2json plugin does not include toasted column in the update
*
* @author Jiri Pechanec
*
*/
public static boolean areToastedValuesPresentInSchema() {
return !wal2Json();
}
}

View File

@ -1251,6 +1251,18 @@ public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTable
testReceiveChangesForReplicaIdentityFullTableWithToastedValue(SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST, false);
}
@Test
@FixFor("DBZ-1146")
public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTableFromSnapshotFullDiff() throws Exception {
testReceiveChangesForReplicaIdentityFullTableWithToastedValue(SchemaRefreshMode.COLUMNS_DIFF, true);
}
@Test
@FixFor("DBZ-1146")
public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTableFromStreamingFullDiff() throws Exception {
testReceiveChangesForReplicaIdentityFullTableWithToastedValue(SchemaRefreshMode.COLUMNS_DIFF, false);
}
private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception{
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, mode)
@ -1294,16 +1306,28 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
executeAndWait("UPDATE test_table set not_toast = 20");
SourceRecord updatedRecord = consumer.remove();
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValue)
), updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null)
), updatedRecord, Envelope.FieldName.AFTER);
if (DecoderDifferences.areToastedValuesPresentInSchema()) {
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValue)
), updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null)
), updatedRecord, Envelope.FieldName.AFTER);
}
else {
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10)
), updatedRecord, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20)
), updatedRecord, Envelope.FieldName.AFTER);
}
recordsProducer.stop();
}