DBZ-5045 Remove deprecated Postgres EXPORTED snapshot option
This commit is contained in:
parent
83dfd9f638
commit
1eacbdc590
@ -8,10 +8,8 @@
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||||
@ -39,7 +37,6 @@
|
|||||||
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
|
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
|
||||||
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
|
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
|
||||||
import io.debezium.connector.postgresql.spi.Snapshotter;
|
import io.debezium.connector.postgresql.spi.Snapshotter;
|
||||||
import io.debezium.data.Envelope;
|
|
||||||
import io.debezium.jdbc.JdbcConfiguration;
|
import io.debezium.jdbc.JdbcConfiguration;
|
||||||
import io.debezium.relational.ColumnFilterMode;
|
import io.debezium.relational.ColumnFilterMode;
|
||||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||||
@ -203,12 +200,6 @@ public enum SnapshotMode implements EnumeratedValue {
|
|||||||
*/
|
*/
|
||||||
INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()),
|
INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()),
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform an exported snapshot
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
EXPORTED("exported", (c) -> new InitialSnapshotter()),
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inject a custom snapshotter, which allows for more control over snapshots.
|
* Inject a custom snapshotter, which allows for more control over snapshots.
|
||||||
*/
|
*/
|
||||||
@ -704,13 +695,6 @@ public static AutoCreateMode parse(String value, String defaultValue) {
|
|||||||
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
|
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
|
||||||
.withWidth(Width.SHORT)
|
.withWidth(Width.SHORT)
|
||||||
.withImportance(Importance.MEDIUM)
|
.withImportance(Importance.MEDIUM)
|
||||||
.withValidation((config, field, output) -> {
|
|
||||||
if (config.getString(field).toLowerCase().equals(SnapshotMode.EXPORTED.getValue())) {
|
|
||||||
LOGGER.warn("Value '{}' of 'snapshot.mode' option is deprecated, use '{}' instead",
|
|
||||||
SnapshotMode.EXPORTED.getValue(), SnapshotMode.INITIAL.getValue());
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
})
|
|
||||||
.withDescription("The criteria for running a snapshot upon startup of the connector. "
|
.withDescription("The criteria for running a snapshot upon startup of the connector. "
|
||||||
+ "Options include: "
|
+ "Options include: "
|
||||||
+ "'always' to specify that the connector run a snapshot each time it starts up; "
|
+ "'always' to specify that the connector run a snapshot each time it starts up; "
|
||||||
|
@ -1518,7 +1518,7 @@ public void shouldAllowForExportedSnapshot() throws Exception {
|
|||||||
// the replication slot was created.
|
// the replication slot was created.
|
||||||
TestHelper.execute(SETUP_TABLES_STMT);
|
TestHelper.execute(SETUP_TABLES_STMT);
|
||||||
Configuration config = TestHelper.defaultConfig()
|
Configuration config = TestHelper.defaultConfig()
|
||||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue())
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
|
||||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
|
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
|
||||||
.build();
|
.build();
|
||||||
start(PostgresConnector.class, config);
|
start(PostgresConnector.class, config);
|
||||||
@ -1552,7 +1552,7 @@ public void shouldAllowForExportedSnapshot() throws Exception {
|
|||||||
stopConnector();
|
stopConnector();
|
||||||
|
|
||||||
config = TestHelper.defaultConfig()
|
config = TestHelper.defaultConfig()
|
||||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue())
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
|
||||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
||||||
.build();
|
.build();
|
||||||
start(PostgresConnector.class, config);
|
start(PostgresConnector.class, config);
|
||||||
@ -1582,7 +1582,7 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception {
|
|||||||
TestHelper.execute(INSERT_STMT);
|
TestHelper.execute(INSERT_STMT);
|
||||||
|
|
||||||
Configuration config = TestHelper.defaultConfig()
|
Configuration config = TestHelper.defaultConfig()
|
||||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue())
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
|
||||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
|
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
|
||||||
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2)
|
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2)
|
||||||
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1)
|
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1)
|
||||||
@ -1631,7 +1631,7 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exc
|
|||||||
TestHelper.execute(INSERT_STMT);
|
TestHelper.execute(INSERT_STMT);
|
||||||
|
|
||||||
Configuration config = TestHelper.defaultConfig()
|
Configuration config = TestHelper.defaultConfig()
|
||||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue())
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
|
||||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
|
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
|
||||||
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2)
|
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2)
|
||||||
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1)
|
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1)
|
||||||
|
Loading…
Reference in New Issue
Block a user