DBZ-5229 Reorganize code for inheritance use

This commit is contained in:
Jiri Pechanec 2022-06-30 16:26:15 +02:00
parent 1e98524a64
commit d8baac7c46
7 changed files with 34 additions and 109 deletions

View File

@ -14,7 +14,6 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,7 +33,6 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
@ -920,7 +918,6 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
BUFFER_SIZE_FOR_BINLOG_READER,
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
INCONSISTENT_SCHEMA_HANDLING_MODE)
.history(KafkaDatabaseHistory.ALL_FIELDS.asArray())
.create();
protected static ConfigDef configDef() {
@ -936,31 +933,6 @@ protected static ConfigDef configDef() {
protected static final Set<String> BUILT_IN_DB_NAMES = Collect.unmodifiableSet("mysql", "performance_schema", "sys", "information_schema");
@Override
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, MySqlConnector.class.getName())
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
return databaseHistory;
}
@Override
public boolean supportsOperationFiltering() {
return true;

View File

@ -18,7 +18,6 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,10 +43,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Strings;
@ -1464,31 +1460,6 @@ public TransactionSnapshotBoundaryMode getLogMiningTransactionSnapshotBoundaryMo
return logMiningTransactionSnapshotBoundaryMode;
}
@Override
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(OracleConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(OracleConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, OracleConnectorConfig.class.getName())
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
return databaseHistory;
}
@Override
public String getConnectorName() {
return Module.name();

View File

@ -15,7 +15,6 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,10 +30,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
/**
@ -440,31 +436,6 @@ public boolean getOptionRecompile() {
return optionRecompile;
}
@Override
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(SqlServerConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(SqlServerConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, SqlServerConnector.class.getName())
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
return databaseHistory;
}
@Override
public boolean supportsOperationFiltering() {
return true;

View File

@ -16,6 +16,7 @@
import io.debezium.config.Field;
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
@ -28,7 +29,8 @@
public abstract class HistorizedRelationalDatabaseConnectorConfig extends RelationalDatabaseConnectorConfig {
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000;
public static final String DEFAULT_DATABASE_HISTORY = "io.debezium.storage.kafka.history.KafkaDatabaseHistory";
private static final String DEFAULT_DATABASE_HISTORY = "io.debezium.storage.kafka.history.KafkaDatabaseHistory";
private boolean useCatalogBeforeSchema;
private final String logicalName;
@ -53,8 +55,8 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
protected static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.history(
DATABASE_HISTORY,
DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL,
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS)
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL)
.create();
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
@ -95,11 +97,16 @@ public DatabaseHistory getDatabaseHistory() {
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false);
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(AbstractDatabaseHistory.INTERNAL_CONNECTOR_CLASS, connectorClass.getName())
.withDefault(AbstractDatabaseHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
new DatabaseHistoryMetrics(this, multiPartitionMode), useCatalogBeforeSchema); // validates
new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
return databaseHistory;
}

View File

@ -40,6 +40,24 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory {
protected final Logger logger = LoggerFactory.getLogger(getClass());
// Required for unified thread creation
public static final Field INTERNAL_CONNECTOR_CLASS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.class")
.withDisplayName("Debezium connector class")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("The class of the Debezium database connector")
.withNoValidation();
// Required for unified thread creation
public static final Field INTERNAL_CONNECTOR_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.id")
.withDisplayName("Debezium connector identifier")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("The unique identifier of the Debezium connector")
.withNoValidation();
// Temporary preference for DDL over logical schema due to DBZ-32
public static final Field INTERNAL_PREFER_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "prefer.ddl")
.withDisplayName("Prefer DDL for schema recovery")
@ -51,6 +69,9 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory {
.withInvisibleRecommender()
.withNoValidation();
public static Field.Set ALL_FIELDS = Field.setOf(DatabaseHistory.NAME, INTERNAL_CONNECTOR_CLASS,
INTERNAL_CONNECTOR_ID);
protected Configuration config;
private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE;
private boolean skipUnparseableDDL;

View File

@ -144,24 +144,6 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
.withDefault(100)
.withValidation(Field::isInteger);
// Required for unified thread creation
public static final Field INTERNAL_CONNECTOR_CLASS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.class")
.withDisplayName("Debezium connector class")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("The class of the Debezium database connector")
.withNoValidation();
// Required for unified thread creation
public static final Field INTERNAL_CONNECTOR_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.id")
.withDisplayName("Debezium connector identifier")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("The unique identifier of the Debezium connector")
.withNoValidation();
public static final Field KAFKA_QUERY_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.query.timeout.ms")
.withDisplayName("Kafka admin client query timeout (ms)")
.withType(Type.LONG)

View File

@ -11,6 +11,7 @@
import io.debezium.config.Field;
public class KafkaStorageConfiguration {
public static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, Field.ValidationOutput problems) {
String serverName = config.getString(field);
String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);