DBZ-8168 Fix issue with reselecting JSONB fields in PostgreSQL

This commit is contained in:
Mohamed El Shaer 2024-08-23 12:02:03 +02:00 committed by Chris Cranford
parent fc23216f29
commit 556fc65c1a
4 changed files with 48 additions and 4 deletions

View File

@ -632,3 +632,4 @@ Pradeep Nain
Gaurav Miglani Gaurav Miglani
张展业 张展业
Ashish Binu Ashish Binu
Mohamed El Shaer

View File

@ -116,6 +116,50 @@ protected void waitForStreamingStarted() throws InterruptedException {
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
} }
@Test
@FixFor("DBZ-8168")
public void testToastColumnReselectedWhenJsonbValueIsUnavailable() throws Exception {
TestHelper.execute("CREATE TABLE s1.dbz8168_toast (id int primary key, data jsonb, data2 int);");
final LogInterceptor logInterceptor = getReselectLogInterceptor();
Configuration config = getConfigurationBuilder()
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz8168_toast")
.build();
start(PostgresConnector.class, config);
waitForStreamingStarted();
final String json = "{\"key\": \""+ RandomStringUtils.randomAlphabetic(10000) +"\"}";
TestHelper.execute("INSERT INTO s1.dbz8168_toast (id,data,data2) values (1,'" + json + "',1);",
"UPDATE s1.dbz8168_toast SET data2 = 2 where id = 1;");
final SourceRecords sourceRecords = consumeRecordsByTopic(2);
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic("test_server.s1.dbz8168_toast");
// Check insert
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidInsert(record, "id", 1);
assertThat(after.get("id")).isEqualTo(1);
assertThat(after.get("data")).isEqualTo(json);
assertThat(after.get("data2")).isEqualTo(1);
// Check update
record = tableRecords.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidUpdate(record, "id", 1);
assertThat(after.get("id")).isEqualTo(1);
assertThat(after.get("data")).isEqualTo(json);
assertThat(after.get("data2")).isEqualTo(2);
assertColumnReselectedForUnavailableValue(logInterceptor, "s1.dbz8168_toast", "data");
}
@Test @Test
@FixFor("DBZ-4321") @FixFor("DBZ-4321")
public void testToastColumnReselectedWhenValueIsUnavailable() throws Exception { public void testToastColumnReselectedWhenValueIsUnavailable() throws Exception {

View File

@ -250,10 +250,8 @@ private boolean isUnavailableValueHolder(Schema schema, Object value) {
case MAP: case MAP:
return unavailableValuePlaceholderMap.equals(value); return unavailableValuePlaceholderMap.equals(value);
case STRING: case STRING:
if (Json.LOGICAL_NAME.equals(schema.name())) { final boolean isJsonAndUnavailable = Json.LOGICAL_NAME.equals(schema.name()) && unavailableValuePlaceholderJson.equals(value);
return unavailableValuePlaceholderJson.equals(value); return unavailableValuePlaceholder.equals(value) || isJsonAndUnavailable;
}
return unavailableValuePlaceholder.equals(value);
} }
return false; return false;
} }

View File

@ -279,3 +279,4 @@ TimoWilhelm,Timo Wilhelm
ashishbinu,Ashish Binu ashishbinu,Ashish Binu
wltmlx,Lukas Langegger wltmlx,Lukas Langegger
GitHubSergei,Sergey Kazakov GitHubSergei,Sergey Kazakov
shaer,Mohamed El Shaer