DBZ-1698 Handle toasted unknown column values

This commit is contained in:
Jiri Pechanec 2020-02-03 13:01:33 +01:00 committed by Gunnar Morling
parent ee62533f64
commit 2a6ea1c41f
2 changed files with 61 additions and 0 deletions

View File

@ -910,4 +910,12 @@ protected Object convertString(Column column, Field fieldDefn, Object data) {
} }
return super.convertString(column, fieldDefn, data); return super.convertString(column, fieldDefn, data);
} }
@Override
protected Object handleUnknownData(Column column, Field fieldDefn, Object data) {
if (data == UnchangedToastedReplicationMessageColumn.UNCHANGED_TOAST_VALUE) {
return toastPlaceholderString;
}
return super.handleUnknownData(column, fieldDefn, data);
}
} }

View File

@ -236,6 +236,59 @@ public void shouldReceiveChangesAfterConnectionRestart() throws Exception {
assertRecordInserted("public.t0", PK_FIELD, 2); assertRecordInserted("public.t0", PK_FIELD, 2);
} }
@Test
@FixFor("DBZ-1698")
public void shouldReceiveUpdateSchemaAfterConnectionRestart() throws Exception {
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
startConnector(config -> config
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with(PostgresConnectorConfig.SCHEMA_BLACKLIST, "postgis")
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST));
TestHelper.execute("CREATE TABLE t0 (pk SERIAL, d INTEGER, PRIMARY KEY(pk));");
consumer = testConsumer(1);
waitForStreamingToStart();
// Insert new row and verify inserted
executeAndWait("INSERT INTO t0 (pk,d) VALUES(1,1);");
assertRecordInserted("public.t0", PK_FIELD, 1);
// simulate the connector is stopped
stopConnector();
Thread.sleep(3000);
// Add record offline
TestHelper.execute("INSERT INTO t0 (pk,d) VALUES(2,2);");
// Alter schema offline
TestHelper.execute("ALTER TABLE t0 ADD COLUMN d2 NUMERIC(10,6) DEFAULT 0 NOT NULL;");
TestHelper.execute("ALTER TABLE t0 ALTER COLUMN d SET NOT NULL;");
// Start the producer and wait; the wait is to guarantee the stream thread is polling
// This appears to be a potential race condition problem
startConnector(config -> config
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with(PostgresConnectorConfig.SCHEMA_BLACKLIST, "postgis")
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST),
false);
consumer = testConsumer(2);
waitForStreamingToStart();
// Insert new row and verify inserted
executeAndWait("INSERT INTO t0 (pk,d,d2) VALUES (3,1,3);");
assertRecordInserted("public.t0", PK_FIELD, 2);
assertRecordInserted("public.t0", PK_FIELD, 3);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
}
@Test @Test
public void shouldReceiveChangesForInsertsCustomTypes() throws Exception { public void shouldReceiveChangesForInsertsCustomTypes() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl"); TestHelper.executeDDL("postgres_create_tables.ddl");