DBZ-4419 Map truncate handling mode internally to skipped ops
This commit is contained in:
parent
bd604a23cd
commit
383f619ce1
@ -8,6 +8,7 @@
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@ -39,6 +40,7 @@
|
||||
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
|
||||
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
|
||||
import io.debezium.connector.postgresql.spi.Snapshotter;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.jdbc.JdbcConfiguration;
|
||||
import io.debezium.relational.ColumnFilterMode;
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||
@ -1147,6 +1149,16 @@ public String publicationName() {
|
||||
return getConfig().getString(PUBLICATION_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<Envelope.Operation> getSkippedOperations() {
|
||||
EnumSet<Envelope.Operation> skippedOperations = super.getSkippedOperations();
|
||||
// If user specified TruncateHandlingMode.SKIP we merge that with the existing skipped operations
|
||||
if (TruncateHandlingMode.SKIP.equals(truncateHandlingMode)) {
|
||||
skippedOperations.add(Envelope.Operation.TRUNCATE);
|
||||
}
|
||||
return skippedOperations;
|
||||
}
|
||||
|
||||
protected AutoCreateMode publicationAutocreateMode() {
|
||||
return AutoCreateMode.parse(getConfig().getString(PUBLICATION_AUTOCREATE_MODE));
|
||||
}
|
||||
@ -1167,10 +1179,6 @@ protected Duration statusUpdateInterval() {
|
||||
return Duration.ofMillis(getConfig().getLong(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS));
|
||||
}
|
||||
|
||||
public TruncateHandlingMode truncateHandlingMode() {
|
||||
return truncateHandlingMode;
|
||||
}
|
||||
|
||||
public LogicalDecodingMessageFilter getMessageFilter() {
|
||||
return logicalDecodingMessageFilter;
|
||||
}
|
||||
|
@ -30,7 +30,6 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.postgresql.PostgresConnectorConfig;
|
||||
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.connector.postgresql.TypeRegistry;
|
||||
@ -230,9 +229,6 @@ public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBu
|
||||
}
|
||||
|
||||
private boolean isTruncateEventsIncluded() {
|
||||
if (decoderContext.getConfig().truncateHandlingMode() == PostgresConnectorConfig.TruncateHandlingMode.INCLUDE) {
|
||||
return true;
|
||||
}
|
||||
return !decoderContext.getConfig().getSkippedOperations().contains(Envelope.Operation.TRUNCATE);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user