From a84230447ed52b28acf2f30b58c58dd8af75deb5 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Tue, 30 Apr 2019 09:52:37 +0200 Subject: [PATCH] DBZ-1247 Making defaultSnapshotFetchSize() abstract --- .../postgresql/PostgresConnectorConfig.java | 30 +++++++++++-------- .../config/CommonConnectorConfig.java | 8 ++--- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index ef815de2f..1b69729c3 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -10,34 +10,33 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import io.debezium.config.CommonConnectorConfig; -import io.debezium.config.Configuration; -import io.debezium.config.EnumeratedValue; -import io.debezium.config.Field; - -import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; -import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter; -import io.debezium.connector.postgresql.snapshot.InitialSnapshotter; -import io.debezium.connector.postgresql.snapshot.NeverSnapshotter; -import io.debezium.connector.postgresql.spi.Snapshotter; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.config.EnumeratedValue; +import io.debezium.config.Field; import io.debezium.connector.postgresql.connection.MessageDecoder; import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder; import io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder; import io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder; +import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; +import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter; +import io.debezium.connector.postgresql.snapshot.InitialSnapshotter; +import io.debezium.connector.postgresql.snapshot.NeverSnapshotter; +import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The configuration properties for the {@link PostgresConnector} @@ -890,7 +889,7 @@ protected String slotName() { protected boolean dropSlotOnStop() { return getConfig().getBoolean(DROP_SLOT_ON_STOP); } - + protected String streamParams() { return getConfig().getString(STREAM_PARAMS); } @@ -962,6 +961,11 @@ public String snapshotSelectOverrideForTable(String table) { return getConfig().getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table); } + /** + * DBZ-1234 Obsolete once rows.fetch.size option has been removed; + * value can be passed to super constructor instead + */ + @Deprecated @Override protected int defaultSnapshotFetchSize(Configuration config) { // we use getString() method because it supports null as the return value diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index eed47906d..05d591a2b 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -13,8 +13,8 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; -import io.debezium.heartbeat.Heartbeat; import io.debezium.config.Field.ValidationOutput; +import io.debezium.heartbeat.Heartbeat; import io.debezium.relational.history.KafkaDatabaseHistory; /** @@ -22,7 +22,7 @@ * * @author Gunnar Morling */ -public class CommonConnectorConfig { +public abstract class CommonConnectorConfig { public static final int DEFAULT_MAX_QUEUE_SIZE = 8192; public static final int DEFAULT_MAX_BATCH_SIZE = 2048; @@ -114,9 +114,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName) { * @param config configuration * @return the default fetch size */ - protected int defaultSnapshotFetchSize(Configuration config) { - throw new UnsupportedOperationException("not implemented"); - } + protected abstract int defaultSnapshotFetchSize(Configuration config); /** * Provides access to the "raw" config instance. In most cases, access via typed getters for individual properties