DBZ-8011 CloseWindow signal with INSERT_DELETE strategy will be processed through the event dispatcher

This commit is contained in:
mfvitale 2024-07-15 16:15:49 +02:00 committed by Chris Cranford
parent bf2d325ddb
commit 1039ccfcf2
6 changed files with 24 additions and 17 deletions

View File

@ -37,7 +37,6 @@
import io.debezium.connector.mongodb.connection.DefaultMongoDbAuthProvider;
import io.debezium.connector.mongodb.connection.MongoDbAuthProvider;
import io.debezium.connector.mongodb.shared.SharedMongoDbConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Strings;
@ -1428,8 +1427,8 @@ private static int resolveSnapshotMaxThreads(Configuration config) {
}
@Override
public Optional<String[]> parseSignallingMessage(Struct value) {
final String after = value.getString(Envelope.FieldName.AFTER);
public Optional<String[]> parseSignallingMessage(Struct value, String fieldName) {
final String after = value.getString(fieldName);
if (after == null) {
LOGGER.warn("After part of signal '{}' is missing", value);
return Optional.empty();

View File

@ -22,6 +22,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
public class MongoDbConnectorConfigTest {
@ -34,7 +35,7 @@ public void parseSignallingMessage() {
"\"data\":{\"data-collections\":[\"database.collection\"],\"type\":\"incremental\"}}");
MongoDbConnectorConfig mongoDbConnectorConfig = new MongoDbConnectorConfig(TestHelper.getConfiguration());
Optional<String[]> resultOpt = mongoDbConnectorConfig.parseSignallingMessage(struct);
Optional<String[]> resultOpt = mongoDbConnectorConfig.parseSignallingMessage(struct, Envelope.FieldName.AFTER);
Assert.assertTrue(resultOpt.isPresent());
String[] result = resultOpt.get();

View File

@ -1620,8 +1620,8 @@ public List<String> getEnabledChannels() {
return signalEnabledChannels;
}
public Optional<String[]> parseSignallingMessage(Struct value) {
final Struct after = value.getStruct(Envelope.FieldName.AFTER);
public Optional<String[]> parseSignallingMessage(Struct value, String fieldName) {
final Struct after = value.getStruct(fieldName);
if (after == null) {
LOGGER.warn("After part of signal '{}' is missing", value);
return Optional.empty();

View File

@ -5,6 +5,8 @@
*/
package io.debezium.pipeline;
import static io.debezium.config.CommonConnectorConfig.WatermarkStrategy.INSERT_DELETE;
import java.time.Instant;
import java.util.Collection;
import java.util.EnumSet;
@ -304,7 +306,8 @@ public void changeRecord(P partition,
}
private boolean isASignalEventToProcess(T dataCollectionId, Operation operation) {
return operation == Operation.CREATE &&
return (operation == Operation.CREATE ||
(operation == Operation.DELETE && connectorConfig.getIncrementalSnapshotWatermarkingStrategy() == INSERT_DELETE)) &&
connectorConfig.isSignalDataCollection(dataCollectionId);
}
});

View File

@ -12,6 +12,8 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow;
/**
* The class represent the signal sent on a channel:
@ -38,7 +40,18 @@ public SignalRecord(String id, String type, String data, Map<String, Object> add
public static Optional<SignalRecord> buildSignalRecordFromChangeEventSource(Struct value, CommonConnectorConfig config) {
final Optional<String[]> parseSignal = config.parseSignallingMessage(value);
if (Envelope.Operation.DELETE.code().equals(value.get(Envelope.FieldName.OPERATION))) {
// here we are sure the INSERT_DELETE strategy is used
final Optional<String[]> parseSignal = config.parseSignallingMessage(value, Envelope.FieldName.BEFORE);
return parseSignal.map(signalMessage -> {
final String signalId = signalMessage[0].replace("open", "close");
return new SignalRecord(signalId, CloseIncrementalSnapshotWindow.NAME, signalMessage[2], Map.of());
});
}
final Optional<String[]> parseSignal = config.parseSignallingMessage(value, Envelope.FieldName.AFTER);
return parseSignal.map(signalMessage -> new SignalRecord(signalMessage[0], signalMessage[1], signalMessage[2], Map.of()));
}

View File

@ -39,15 +39,6 @@ public void closeWindow(Partition partition, OffsetContext offsetContext, String
x.setString(1, chunkId + "-open");
});
// Since the close event is not written into signal data collection we need to explicit close the window.
try {
incrementalSnapshotChangeEventSource.closeWindow((P) partition, chunkId + "-close", offsetContext);
}
catch (InterruptedException e) {
LOGGER.warn("Failed to close window {} successful.", chunkId);
Thread.currentThread().interrupt();
}
jdbcConnection.commit();
}
}