DBZ-7601 Fix NPE when converting incremental snapshot events to cloud events

This commit is contained in:
mfvitale 2024-08-12 12:49:42 +02:00 committed by Jiri Pechanec
parent 9d60003139
commit 96b043575f
2 changed files with 27 additions and 3 deletions

View File

@ -40,9 +40,9 @@ public PostgresCloudEventsMaker(RecordAndMetadata recordAndMetadata, SerializerT
@Override @Override
public String ceId() { public String ceId() {
return "name:" + sourceField(AbstractSourceInfo.SERVER_NAME_KEY) return "name:" + sourceField(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";lsn:" + sourceField(LSN_KEY).toString() + ";lsn:" + sourceField(LSN_KEY)
+ ";txId:" + sourceField(TXID_KEY).toString() + ";txId:" + sourceField(TXID_KEY)
+ ";sequence:" + sourceField(SEQUENCE_KEY).toString(); + ";sequence:" + sourceField(SEQUENCE_KEY);
} }
@Override @Override

View File

@ -18,6 +18,7 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -28,6 +29,7 @@
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.VariableScaleDecimal; import io.debezium.data.VariableScaleDecimal;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection;
@ -444,6 +446,28 @@ record -> ((Struct) record.value()).getStruct("source"),
} }
} }
@Test
public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
// Testing.Print.enable();
try (JdbcConnection connection = databaseConnection()) {
populateTable(connection, "s1.anumeric");
}
startConnector(x -> x.with("value.converter", "io.debezium.converters.CloudEventsConverter")
.with("value.converter.serializer.type", "json")
.with("value.converter.data.serializer.type", "json"));
sendAdHocSnapshotSignal("s1.anumeric");
final SourceRecords snapshotRecords = consumeRecordsByTopic(ROW_COUNT);
final List<SourceRecord> snapshotTable1 = snapshotRecords.recordsForTopic("test_server.s1.anumeric");
// test snapshot
for (SourceRecord sourceRecord : snapshotTable1) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
}
}
protected void populate4PkTable() throws SQLException { protected void populate4PkTable() throws SQLException {
try (JdbcConnection connection = databaseConnection()) { try (JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "s1.a4"); populate4PkTable(connection, "s1.a4");