DBZ-6839 Fix PostgresConnectorIT#shouldAddNewFieldToSourceInfo

PostgresConnectorIT#shouldAddNewFieldToSourceInfo fails only when run
together with other tests and the failure is random. It seem there is
a caching issue in Apicuro, when `test_server.s1.a-value` artifact
references `io.debezium.connector.postgresql.Source` version 1, which
hasnt't `newField` field and this reference is used also in
`shouldAddNewFieldToSourceInfo` where artifact with version 2 should
be used. Using dedicated table and thus creating new artifact in
Apucurio should fix this issue.

Also remove unused variable from `CustomPostgresSourceInfoStructMaker`.
This commit is contained in:
Vojtech Juranek 2023-08-24 15:38:18 +02:00 committed by Jiri Pechanec
parent 43a86b2a46
commit 28b06e6795
2 changed files with 8 additions and 5 deletions

View File

@ -37,7 +37,6 @@ public Schema schema() {
@Override
public Struct struct(SourceInfo sourceInfo) {
Struct result = super.struct(sourceInfo);
Long lsn = result.getInt64(SourceInfo.LSN_KEY);
result.put("newField", "newFieldValue");
return result;
}

View File

@ -3477,15 +3477,19 @@ public void shouldIncludeTableWithBackSlashInName() throws Exception {
@Test
@FixFor("DBZ-6076")
public void shouldAddNewFieldToSourceInfo() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
TestHelper.execute(
"DROP TABLE IF EXISTS s1.DBZ6076;",
"CREATE TABLE s1.DBZ6076 (pk SERIAL, aa integer, PRIMARY KEY(pk));",
"INSERT INTO s1.DBZ6076 (aa) VALUES (1);");
start(PostgresConnector.class, TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.name())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.DBZ6076")
.with(PostgresConnectorConfig.SOURCE_INFO_STRUCT_MAKER, CustomPostgresSourceInfoStructMaker.class.getName())
.build());
assertConnectorIsRunning();
// check records from snapshot
SourceRecords actualRecords = consumeRecordsByTopic(2);
SourceRecords actualRecords = consumeRecordsByTopic(1);
actualRecords.forEach(sourceRecord -> {
assertTrue(sourceRecord.value() instanceof Struct);
Struct source = ((Struct) sourceRecord.value()).getStruct("source");
@ -3493,9 +3497,9 @@ public void shouldAddNewFieldToSourceInfo() throws InterruptedException {
});
// insert 2 new records
TestHelper.execute(INSERT_STMT);
TestHelper.execute("INSERT INTO s1.DBZ6076 (aa) VALUES (2);");
// check records from streaming
actualRecords = consumeRecordsByTopic(2);
actualRecords = consumeRecordsByTopic(1);
actualRecords.forEach(sourceRecord -> {
assertTrue(sourceRecord.value() instanceof Struct);
Struct source = ((Struct) sourceRecord.value()).getStruct("source");