diff --git a/.github/workflows/debezium-workflow.yml b/.github/workflows/debezium-workflow.yml
index c81510220..6e13a079f 100644
--- a/.github/workflows/debezium-workflow.yml
+++ b/.github/workflows/debezium-workflow.yml
@@ -56,6 +56,7 @@ jobs:
debezium-ide-configs/**
debezium-parent/pom.xml
debezium-bom/pom.xml
+ debezium-storage/**
pom.xml
.github/workflows/debezium-workflow.yml
diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml
index 62a5e06c4..149dba8f8 100644
--- a/debezium-bom/pom.xml
+++ b/debezium-bom/pom.xml
@@ -525,6 +525,16 @@
debezium-core
${project.version}
+
+ io.debezium
+ debezium-storage-kafka
+ ${project.version}
+
+
+ io.debezium
+ debezium-storage-file
+ ${project.version}
+
io.debezium
debezium-scripting
diff --git a/debezium-connector-mysql/pom.xml b/debezium-connector-mysql/pom.xml
index 890c4baf9..d6ad4af56 100644
--- a/debezium-connector-mysql/pom.xml
+++ b/debezium-connector-mysql/pom.xml
@@ -16,6 +16,14 @@
io.debezium
debezium-core
+
+ io.debezium
+ debezium-storage-kafka
+
+
+ io.debezium
+ debezium-storage-file
+
io.debezium
debezium-ddl-parser
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
index 85ad7be6e..638bd2fc3 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
@@ -14,10 +14,10 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
@@ -34,8 +34,10 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
-import io.debezium.relational.history.KafkaDatabaseHistory;
+import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
+import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Collect;
/**
@@ -538,7 +540,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
+ "but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
- .withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
+ .withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field SERVER_ID = Field.create("database.server.id")
.withDisplayName("Cluster ID")
@@ -918,6 +920,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
BUFFER_SIZE_FOR_BINLOG_READER,
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
INCONSISTENT_SCHEMA_HANDLING_MODE)
+ .history(KafkaDatabaseHistory.ALL_FIELDS.asArray())
.create();
protected static ConfigDef configDef() {
@@ -933,6 +936,31 @@ protected static ConfigDef configDef() {
protected static final Set BUILT_IN_DB_NAMES = Collect.unmodifiableSet("mysql", "performance_schema", "sys", "information_schema");
+ @Override
+ public DatabaseHistory getDatabaseHistory() {
+ Configuration config = getConfig();
+
+ DatabaseHistory databaseHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
+ if (databaseHistory == null) {
+ throw new ConnectException("Unable to instantiate the database history class " +
+ config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
+ }
+
+ // Do not remove the prefix from the subset of config properties ...
+ Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
+ .edit()
+ .withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
+ .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, MySqlConnector.class.getName())
+ .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
+ .build();
+
+ HistoryRecordComparator historyComparator = getHistoryRecordComparator();
+ databaseHistory.configure(dbHistoryConfig, historyComparator,
+ new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
+
+ return databaseHistory;
+ }
+
@Override
public boolean supportsOperationFiltering() {
return true;
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderBufferIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderBufferIT.java
index e4c08b32e..9c832772a 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderBufferIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderBufferIT.java
@@ -25,7 +25,7 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipWhenDatabaseVersion;
-import io.debezium.relational.history.FileDatabaseHistory;
+import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java
index d6a4a5bb0..3950d20bc 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java
@@ -54,9 +54,9 @@
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.FileDatabaseHistory;
-import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.schema.DatabaseSchema;
+import io.debezium.storage.file.history.FileDatabaseHistory;
+import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.util.Testing;
/**
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java
index 303b67a20..01f523ab3 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java
@@ -28,7 +28,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.FileDatabaseHistory;
+import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java
index 68e9ed731..f3784de29 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java
@@ -24,7 +24,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
-import io.debezium.relational.history.FileDatabaseHistory;
+import io.debezium.storage.file.history.FileDatabaseHistory;
/**
* Create and populate a unique instance of a MySQL database for each run of JUnit test. A user of class
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java
index 1d9709a75..b6de65eda 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/FileDatabaseHistoryTest.java
@@ -10,6 +10,7 @@
import org.junit.Before;
import io.debezium.config.Configuration;
+import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java
index 9a48aa97f..e3d17833b 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java
@@ -42,6 +42,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
+import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
diff --git a/debezium-connector-oracle/pom.xml b/debezium-connector-oracle/pom.xml
index 14dc137f7..687066cbe 100644
--- a/debezium-connector-oracle/pom.xml
+++ b/debezium-connector-oracle/pom.xml
@@ -16,6 +16,14 @@
io.debezium
debezium-core
+
+ io.debezium
+ debezium-storage-kafka
+
+
+ io.debezium
+ debezium-storage-file
+
io.debezium
debezium-ddl-parser
diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java
index 299e6517e..e8b071890 100644
--- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java
+++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java
@@ -18,11 +18,11 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
-import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
@@ -44,7 +44,11 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
+import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Strings;
/**
@@ -133,7 +137,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
+ "snapshot is taken.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
- .withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
+ .withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field CONNECTOR_ADAPTER = Field.create(DATABASE_CONFIG_PREFIX + "connection.adapter")
.withDisplayName("Connector adapter")
@@ -1460,6 +1464,31 @@ public TransactionSnapshotBoundaryMode getLogMiningTransactionSnapshotBoundaryMo
return logMiningTransactionSnapshotBoundaryMode;
}
+ @Override
+ public DatabaseHistory getDatabaseHistory() {
+ Configuration config = getConfig();
+
+ DatabaseHistory databaseHistory = config.getInstance(OracleConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
+ if (databaseHistory == null) {
+ throw new ConnectException("Unable to instantiate the database history class " +
+ config.getString(OracleConnectorConfig.DATABASE_HISTORY));
+ }
+
+ // Do not remove the prefix from the subset of config properties ...
+ Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
+ .edit()
+ .withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
+ .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, OracleConnectorConfig.class.getName())
+ .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
+ .build();
+
+ HistoryRecordComparator historyComparator = getHistoryRecordComparator();
+ databaseHistory.configure(dbHistoryConfig, historyComparator,
+ new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
+
+ return databaseHistory;
+ }
+
@Override
public String getConnectorName() {
return Module.name();
diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java
index b96c44082..d9e243381 100644
--- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java
+++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorConfigTest.java
@@ -20,7 +20,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.doc.FixFor;
-import io.debezium.relational.history.KafkaDatabaseHistory;
+import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
public class OracleConnectorConfigTest {
diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java
index a9c304d7e..1f0e45648 100644
--- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java
+++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java
@@ -86,8 +86,8 @@
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
-import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.MemoryDatabaseHistory;
+import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java
index 9d8520c26..17b21991e 100644
--- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java
+++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java
@@ -24,7 +24,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.jdbc.JdbcConfiguration;
-import io.debezium.relational.history.FileDatabaseHistory;
+import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
diff --git a/debezium-connector-sqlserver/pom.xml b/debezium-connector-sqlserver/pom.xml
index bece0a525..b5f7c63a7 100644
--- a/debezium-connector-sqlserver/pom.xml
+++ b/debezium-connector-sqlserver/pom.xml
@@ -30,6 +30,14 @@
io.debezium
debezium-core
+
+ io.debezium
+ debezium-storage-kafka
+
+
+ io.debezium
+ debezium-storage-file
+
com.microsoft.sqlserver
mssql-jdbc
diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java
index 1b2b03312..c243f0de0 100644
--- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java
+++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java
@@ -15,10 +15,10 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
@@ -31,7 +31,11 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
+import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
/**
* The list of configuration options for SQL Server connector
@@ -223,7 +227,7 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
.withDefault(DEFAULT_PORT);
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
- .withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
+ .withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field INSTANCE = Field.create(DATABASE_CONFIG_PREFIX + SqlServerConnection.INSTANCE_NAME)
.withDisplayName("Instance name")
@@ -436,6 +440,31 @@ public boolean getOptionRecompile() {
return optionRecompile;
}
+ @Override
+ public DatabaseHistory getDatabaseHistory() {
+ Configuration config = getConfig();
+
+ DatabaseHistory databaseHistory = config.getInstance(SqlServerConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
+ if (databaseHistory == null) {
+ throw new ConnectException("Unable to instantiate the database history class " +
+ config.getString(SqlServerConnectorConfig.DATABASE_HISTORY));
+ }
+
+ // Do not remove the prefix from the subset of config properties ...
+ Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
+ .edit()
+ .withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
+ .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, SqlServerConnector.class.getName())
+ .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
+ .build();
+
+ HistoryRecordComparator historyComparator = getHistoryRecordComparator();
+ databaseHistory.configure(dbHistoryConfig, historyComparator,
+ new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
+
+ return databaseHistory;
+ }
+
@Override
public boolean supportsOperationFiltering() {
return true;
diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorConfigTest.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorConfigTest.java
index 74d29340c..280c4b2e2 100644
--- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorConfigTest.java
+++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorConfigTest.java
@@ -13,7 +13,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
-import io.debezium.relational.history.KafkaDatabaseHistory;
+import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
public class SqlServerConnectorConfigTest {
diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java
index aaeba3fad..bbca4f2e6 100644
--- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java
+++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java
@@ -64,10 +64,10 @@
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
-import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DatabaseSchema;
+import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java
index cf1ee26ec..3f0c6deb1 100644
--- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java
+++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java
@@ -42,7 +42,7 @@
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
-import io.debezium.relational.history.FileDatabaseHistory;
+import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Strings;
diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java
index d28083f14..28e4faa94 100644
--- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java
+++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java
@@ -11,7 +11,6 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
@@ -38,7 +37,6 @@
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.relational.CustomConverterRegistry;
-import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.converter.ConvertedField;
@@ -881,18 +879,6 @@ private static boolean isUsingAvroConverter(Configuration config) {
|| APICURIO_AVRO_CONVERTER.equals(keyConverter) || APICURIO_AVRO_CONVERTER.equals(valueConverter);
}
- public static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, ValidationOutput problems) {
- String serverName = config.getString(field);
- String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);
-
- if (Objects.equals(serverName, historyTopicName)) {
- problems.accept(field, serverName, "Must not have the same value as " + KafkaDatabaseHistory.TOPIC.name());
- return 1;
- }
-
- return 0;
- }
-
/**
* Returns the connector-specific {@link SourceInfoStructMaker} based on the given configuration.
*/
diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java
index 1df75de10..e8713ebef 100644
--- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java
+++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java
@@ -19,7 +19,6 @@
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
-import io.debezium.relational.history.KafkaDatabaseHistory;
/**
* Configuration options shared across the relational CDC connectors which use a persistent database schema history.
@@ -29,6 +28,7 @@
public abstract class HistorizedRelationalDatabaseConnectorConfig extends RelationalDatabaseConnectorConfig {
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000;
+ public static final String DEFAULT_DATABASE_HISTORY = "io.debezium.storage.kafka.history.KafkaDatabaseHistory";
private boolean useCatalogBeforeSchema;
private final String logicalName;
@@ -48,18 +48,13 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
.withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. "
+ "The configuration properties for the history are prefixed with the '"
+ DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
- .withDefault(KafkaDatabaseHistory.class.getName());
+ .withDefault(DEFAULT_DATABASE_HISTORY);
protected static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.history(
DATABASE_HISTORY,
- DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL,
- KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
- KafkaDatabaseHistory.TOPIC,
- KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
- KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS,
- KafkaDatabaseHistory.KAFKA_QUERY_TIMEOUT_MS)
+ DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS)
.create();
protected HistorizedRelationalDatabaseConnectorConfig(Class extends SourceConnector> connectorClass,
@@ -100,12 +95,7 @@ public DatabaseHistory getDatabaseHistory() {
}
// Do not remove the prefix from the subset of config properties ...
- Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
- .edit()
- .withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
- .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, connectorClass.getName())
- .withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, logicalName)
- .build();
+ Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false);
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
@@ -118,6 +108,10 @@ public boolean useCatalogBeforeSchema() {
return useCatalogBeforeSchema;
}
+ public boolean multiPartitionMode() {
+ return multiPartitionMode;
+ }
+
/**
* Returns a comparator to be used when recovering records from the schema history, making sure no history entries
* newer than the offset we resume from are recovered (which could happen when restarting a connector after history
diff --git a/debezium-embedded/README.md b/debezium-embedded/README.md
index ee5ee2f45..1b7b36cc5 100644
--- a/debezium-embedded/README.md
+++ b/debezium-embedded/README.md
@@ -47,7 +47,7 @@ Here's an example of code that configures and runs an embedded MySQL connector:
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "my-app-connector")
- .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
+ .with("database.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
@@ -88,7 +88,7 @@ The next few lines define the fields that are specific to the connector, which i
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "products")
- .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
+ .with("database.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
diff --git a/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java b/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java
index bd5d19c0a..ee22a642b 100644
--- a/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java
+++ b/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java
@@ -52,7 +52,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.jdbc.JdbcConfiguration;
-import io.debezium.relational.history.FileDatabaseHistory;
+import io.debezium.storage.file.history.FileDatabaseHistory
import io.debezium.util.IoUtil;
/**
diff --git a/debezium-storage/debezium-storage-file/pom.xml b/debezium-storage/debezium-storage-file/pom.xml
new file mode 100644
index 000000000..d41e69d06
--- /dev/null
+++ b/debezium-storage/debezium-storage-file/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ io.debezium
+ debezium-storage
+ 2.0.0-SNAPSHOT
+ ../pom.xml
+
+ 4.0.0
+ debezium-storage-file
+ Debezium Storage File Module
+ jar
+
+
+ io.debezium
+ debezium-api
+
+
+ io.debezium
+ debezium-core
+
+
+ org.slf4j
+ slf4j-api
+ provided
+
+
+ org.apache.kafka
+ connect-api
+ provided
+
+
+
+
diff --git a/debezium-core/src/main/java/io/debezium/relational/history/FileDatabaseHistory.java b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileDatabaseHistory.java
similarity index 93%
rename from debezium-core/src/main/java/io/debezium/relational/history/FileDatabaseHistory.java
rename to debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileDatabaseHistory.java
index 2cbaeaf82..c8f946e51 100644
--- a/debezium-core/src/main/java/io/debezium/relational/history/FileDatabaseHistory.java
+++ b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileDatabaseHistory.java
@@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.relational.history;
+package io.debezium.storage.file.history;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -26,6 +26,12 @@
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Collect;
import io.debezium.util.FunctionalReadWriteLock;
@@ -37,7 +43,7 @@
@ThreadSafe
public final class FileDatabaseHistory extends AbstractDatabaseHistory {
- public static final Field FILE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "file.filename")
+ public static final Field FILE_PATH = Field.create(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "file.filename")
.withDescription("The path to the file that will be used to record the database history")
.required();
diff --git a/debezium-storage/debezium-storage-kafka/pom.xml b/debezium-storage/debezium-storage-kafka/pom.xml
new file mode 100644
index 000000000..5701d1df7
--- /dev/null
+++ b/debezium-storage/debezium-storage-kafka/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ io.debezium
+ debezium-storage
+ 2.0.0-SNAPSHOT
+ ../pom.xml
+
+ 4.0.0
+ debezium-storage-kafka
+ Debezium Storage Kafka Module
+ jar
+
+
+ io.debezium
+ debezium-api
+
+
+ io.debezium
+ debezium-core
+
+
+ org.slf4j
+ slf4j-api
+ provided
+
+
+ org.apache.kafka
+ connect-api
+ provided
+
+
+
+
diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaDatabaseHistory.java
similarity index 98%
rename from debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java
rename to debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaDatabaseHistory.java
index 0dcc89bb1..ef09d5a01 100644
--- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java
+++ b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaDatabaseHistory.java
@@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.relational.history;
+package io.debezium.storage.kafka.history;
import java.io.IOException;
import java.time.Duration;
@@ -58,6 +58,12 @@
import io.debezium.config.Field.Validator;
import io.debezium.document.DocumentReader;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
@@ -508,7 +514,7 @@ public String toString() {
return "Kafka topic";
}
- protected static String consumerConfigPropertyName(String kafkaConsumerPropertyName) {
+ public static String consumerConfigPropertyName(String kafkaConsumerPropertyName) {
return CONSUMER_PREFIX + kafkaConsumerPropertyName;
}
diff --git a/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaStorageConfiguration.java b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaStorageConfiguration.java
new file mode 100644
index 000000000..08648de26
--- /dev/null
+++ b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaStorageConfiguration.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.storage.kafka.history;
+
+import java.util.Objects;
+
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+
+public class KafkaStorageConfiguration {
+ public static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, Field.ValidationOutput problems) {
+ String serverName = config.getString(field);
+ String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);
+
+ if (Objects.equals(serverName, historyTopicName)) {
+ problems.accept(field, serverName, "Must not have the same value as " + KafkaDatabaseHistory.TOPIC.name());
+ return 1;
+ }
+
+ return 0;
+ }
+}
diff --git a/debezium-storage/pom.xml b/debezium-storage/pom.xml
new file mode 100644
index 000000000..da20a9ad2
--- /dev/null
+++ b/debezium-storage/pom.xml
@@ -0,0 +1,19 @@
+
+
+
+ io.debezium
+ debezium-parent
+ 2.0.0-SNAPSHOT
+ ../debezium-parent/pom.xml
+
+ 4.0.0
+ debezium-storage
+ Debezium Storage Module
+ pom
+
+
+ debezium-storage-kafka
+ debezium-storage-file
+
+
+
diff --git a/documentation/modules/ROOT/pages/development/engine.adoc b/documentation/modules/ROOT/pages/development/engine.adoc
index 5563bfb5e..a8eeb1537 100644
--- a/documentation/modules/ROOT/pages/development/engine.adoc
+++ b/documentation/modules/ROOT/pages/development/engine.adoc
@@ -98,7 +98,7 @@ Here's an example of code that configures and runs an embedded xref:{link-mysql-
props.setProperty("database.server.id", "85744");
props.setProperty("database.server.name", "my-app-connector");
props.setProperty("database.history",
- "io.debezium.relational.history.FileDatabaseHistory");
+ "io.debezium.storage.file.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename",
"/path/to/storage/dbhistory.dat");
@@ -153,7 +153,7 @@ The next few lines define the fields that are specific to the connector (documen
props.setProperty("database.server.id", "85744")
props.setProperty("database.server.name", "my-app-connector")
props.setProperty("database.history",
- "io.debezium.relational.history.FileDatabaseHistory")
+ "io.debezium.storage.file.history.FileDatabaseHistory")
props.setProperty("database.history.file.filename",
"/path/to/storage/dbhistory.dat")
----
diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc
index 934cddc95..8457d9a26 100644
--- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc
+++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc
@@ -220,12 +220,12 @@ To use Redis to store offsets, use `io.debezium.server.redis.RedisOffsetBackingS
|[[debezium-source-database-history-class]]<>
-|`io.debezium.relational.history.KafkaDatabaseHistory`
+|`io.debezium.storage.kafka.history.KafkaDatabaseHistory`
|Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) track the database schema evolution over time and stores this data in a database schema history.
This is by default based on Kafka.
There are also other options available
-* `io.debezium.relational.history.FileDatabaseHistory` for non-Kafka deployments
+* `io.debezium.storage.file.history.FileDatabaseHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemoryDatabaseHistory` volatile store for test environments
* `io.debezium.server.redis.RedisDatabaseHistory` volatile store for test environments
diff --git a/documentation/modules/ROOT/pages/operations/embedded.adoc b/documentation/modules/ROOT/pages/operations/embedded.adoc
index ec29e29b7..e8c9409b1 100644
--- a/documentation/modules/ROOT/pages/operations/embedded.adoc
+++ b/documentation/modules/ROOT/pages/operations/embedded.adoc
@@ -86,7 +86,7 @@ Configuration config = Configuration.create()
.with("database.server.id", 85744)
.with("database.server.name", "my-app-connector")
.with("database.history",
- "io.debezium.relational.history.FileDatabaseHistory")
+ "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/path/to/storage/dbhistory.dat")
.build();
@@ -138,7 +138,7 @@ The next few lines define the fields that are specific to the connector, which i
.with("database.server.id", 85744)
.with("database.server.name", "products")
.with("database.history",
- "io.debezium.relational.history.FileDatabaseHistory")
+ "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/path/to/storage/dbhistory.dat")
.build();
diff --git a/documentation/modules/ROOT/pages/operations/logging.adoc b/documentation/modules/ROOT/pages/operations/logging.adoc
index 12587a3d4..af743facb 100644
--- a/documentation/modules/ROOT/pages/operations/logging.adoc
+++ b/documentation/modules/ROOT/pages/operations/logging.adoc
@@ -144,7 +144,7 @@ log4j.logger.io.debezium.connector.mysql=DEBUG, stdout // <1>
log4j.logger.io.debezium.relational.history=DEBUG, stdout // <2>
log4j.additivity.io.debezium.connector.mysql=false // <3>
-log4j.additivity.io.debezium.relational.history=false // <3>
+log4j.additivity.io.debezium.storage.kafka.history=false // <3>
...
----
<1> Configures the logger named `io.debezium.connector.mysql` to send `DEBUG`, `INFO`, `WARN`, and `ERROR` messages to the `stdout` appender.
@@ -189,7 +189,7 @@ log4j.logger.io.debezium.connector.mysql.BinlogReader=DEBUG, stdout
log4j.logger.io.debezium.relational.history=INFO, stdout
log4j.additivity.io.debezium.connector.mysql=false
-log4j.additivity.io.debezium.relational.history=false
+log4j.additivity.io.debezium.storage.kafka.history=false
log4j.additivity.io.debezium.connector.mysql.BinlogReader=false
...
----
diff --git a/documentation/modules/ROOT/partials/modules/tutorial/ref-watching-connector-start-up.adoc b/documentation/modules/ROOT/partials/modules/tutorial/ref-watching-connector-start-up.adoc
index b05de09d1..cb71b4a65 100644
--- a/documentation/modules/ROOT/partials/modules/tutorial/ref-watching-connector-start-up.adoc
+++ b/documentation/modules/ROOT/partials/modules/tutorial/ref-watching-connector-start-up.adoc
@@ -35,7 +35,7 @@ Further down, you should see output like the following from the connector:
2021-11-30 01:38:44,406 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,406 INFO || Kafka commitId: 8cb0a5e9d3441962 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,407 INFO || Kafka startTimeMs: 1638236324406 [org.apache.kafka.common.utils.AppInfoParser]
-2021-11-30 01:38:44,437 INFO || Database history topic '(name=dbhistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.relational.history.KafkaDatabaseHistory]
+2021-11-30 01:38:44,437 INFO || Database history topic '(name=dbhistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.storage.kafka.history.KafkaDatabaseHistory]
2021-11-30 01:38:44,497 INFO || App info kafka.admin.client for dbserver1-dbhistory unregistered [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,499 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
diff --git a/pom.xml b/pom.xml
index 14a410a50..6ff6a1f20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,6 +161,7 @@
debezium-testing
debezium-connect-rest-extension
debezium-schema-generator
+ debezium-storage