DBZ-5229 define centralized and modular aproach for debezium storage

This commit is contained in:
Hossein Torabi 2022-06-14 21:49:56 +02:00 committed by Jiri Pechanec
parent 7af6037078
commit 1ea02be9c2
36 changed files with 290 additions and 62 deletions

View File

@ -56,6 +56,7 @@ jobs:
debezium-ide-configs/**
debezium-parent/pom.xml
debezium-bom/pom.xml
debezium-storage/**
pom.xml
.github/workflows/debezium-workflow.yml

View File

@ -525,6 +525,16 @@
<artifactId>debezium-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-file</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-scripting</artifactId>

View File

@ -16,6 +16,14 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-file</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>

View File

@ -14,10 +14,10 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
@ -34,8 +34,10 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Collect;
/**
@ -538,7 +540,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
+ "but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
.withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field SERVER_ID = Field.create("database.server.id")
.withDisplayName("Cluster ID")
@ -918,6 +920,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
BUFFER_SIZE_FOR_BINLOG_READER,
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
INCONSISTENT_SCHEMA_HANDLING_MODE)
.history(KafkaDatabaseHistory.ALL_FIELDS.asArray())
.create();
protected static ConfigDef configDef() {
@ -933,6 +936,31 @@ protected static ConfigDef configDef() {
protected static final Set<String> BUILT_IN_DB_NAMES = Collect.unmodifiableSet("mysql", "performance_schema", "sys", "information_schema");
@Override
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, MySqlConnector.class.getName())
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
return databaseHistory;
}
@Override
public boolean supportsOperationFiltering() {
return true;

View File

@ -25,7 +25,7 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**

View File

@ -54,9 +54,9 @@
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.schema.DatabaseSchema;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.util.Testing;
/**

View File

@ -28,7 +28,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**

View File

@ -24,7 +24,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory;
/**
* Create and populate a unique instance of a MySQL database for each run of JUnit test. A user of class

View File

@ -10,6 +10,7 @@
import org.junit.Before;
import io.debezium.config.Configuration;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**

View File

@ -42,6 +42,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
import io.debezium.util.Testing;

View File

@ -16,6 +16,14 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-file</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>

View File

@ -18,11 +18,11 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
@ -44,7 +44,11 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Strings;
/**
@ -133,7 +137,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
+ "snapshot is taken.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
.withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field CONNECTOR_ADAPTER = Field.create(DATABASE_CONFIG_PREFIX + "connection.adapter")
.withDisplayName("Connector adapter")
@ -1460,6 +1464,31 @@ public TransactionSnapshotBoundaryMode getLogMiningTransactionSnapshotBoundaryMo
return logMiningTransactionSnapshotBoundaryMode;
}
@Override
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(OracleConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(OracleConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, OracleConnectorConfig.class.getName())
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
return databaseHistory;
}
@Override
public String getConnectorName() {
return Module.name();

View File

@ -20,7 +20,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.doc.FixFor;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
public class OracleConnectorConfigTest {

View File

@ -86,8 +86,8 @@
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.MemoryDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**

View File

@ -24,7 +24,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Strings;
import io.debezium.util.Testing;

View File

@ -30,6 +30,14 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-file</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>

View File

@ -15,10 +15,10 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
@ -31,7 +31,11 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
/**
* The list of configuration options for SQL Server connector
@ -223,7 +227,7 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
.withDefault(DEFAULT_PORT);
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
.withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field INSTANCE = Field.create(DATABASE_CONFIG_PREFIX + SqlServerConnection.INSTANCE_NAME)
.withDisplayName("Instance name")
@ -436,6 +440,31 @@ public boolean getOptionRecompile() {
return optionRecompile;
}
@Override
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(SqlServerConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(SqlServerConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, SqlServerConnector.class.getName())
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, getLogicalName())
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
new DatabaseHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
return databaseHistory;
}
@Override
public boolean supportsOperationFiltering() {
return true;

View File

@ -13,7 +13,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
public class SqlServerConnectorConfigTest {

View File

@ -64,10 +64,10 @@
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DatabaseSchema;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**

View File

@ -42,7 +42,7 @@
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Strings;

View File

@ -11,7 +11,6 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
@ -38,7 +37,6 @@
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.converter.ConvertedField;
@ -881,18 +879,6 @@ private static boolean isUsingAvroConverter(Configuration config) {
|| APICURIO_AVRO_CONVERTER.equals(keyConverter) || APICURIO_AVRO_CONVERTER.equals(valueConverter);
}
public static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, ValidationOutput problems) {
String serverName = config.getString(field);
String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);
if (Objects.equals(serverName, historyTopicName)) {
problems.accept(field, serverName, "Must not have the same value as " + KafkaDatabaseHistory.TOPIC.name());
return 1;
}
return 0;
}
/**
* Returns the connector-specific {@link SourceInfoStructMaker} based on the given configuration.
*/

View File

@ -19,7 +19,6 @@
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
/**
* Configuration options shared across the relational CDC connectors which use a persistent database schema history.
@ -29,6 +28,7 @@
public abstract class HistorizedRelationalDatabaseConnectorConfig extends RelationalDatabaseConnectorConfig {
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000;
public static final String DEFAULT_DATABASE_HISTORY = "io.debezium.storage.kafka.history.KafkaDatabaseHistory";
private boolean useCatalogBeforeSchema;
private final String logicalName;
@ -48,18 +48,13 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
.withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. "
+ "The configuration properties for the history are prefixed with the '"
+ DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(KafkaDatabaseHistory.class.getName());
.withDefault(DEFAULT_DATABASE_HISTORY);
protected static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.history(
DATABASE_HISTORY,
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL,
KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC,
KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS,
KafkaDatabaseHistory.KAFKA_QUERY_TIMEOUT_MS)
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS)
.create();
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
@ -100,12 +95,7 @@ public DatabaseHistory getDatabaseHistory() {
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, connectorClass.getName())
.withDefault(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false);
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
databaseHistory.configure(dbHistoryConfig, historyComparator,
@ -118,6 +108,10 @@ public boolean useCatalogBeforeSchema() {
return useCatalogBeforeSchema;
}
public boolean multiPartitionMode() {
return multiPartitionMode;
}
/**
* Returns a comparator to be used when recovering records from the schema history, making sure no history entries
* newer than the offset we resume from are recovered (which could happen when restarting a connector after history

View File

@ -47,7 +47,7 @@ Here's an example of code that configures and runs an embedded MySQL connector:
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "my-app-connector")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
@ -88,7 +88,7 @@ The next few lines define the fields that are specific to the connector, which i
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "products")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())

View File

@ -52,7 +52,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory
import io.debezium.util.IoUtil;
/**

View File

@ -0,0 +1,34 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-file</artifactId>
<name>Debezium Storage File Module</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;
package io.debezium.storage.file.history;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@ -26,6 +26,12 @@
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Collect;
import io.debezium.util.FunctionalReadWriteLock;
@ -37,7 +43,7 @@
@ThreadSafe
public final class FileDatabaseHistory extends AbstractDatabaseHistory {
public static final Field FILE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "file.filename")
public static final Field FILE_PATH = Field.create(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "file.filename")
.withDescription("The path to the file that will be used to record the database history")
.required();

View File

@ -0,0 +1,34 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-kafka</artifactId>
<name>Debezium Storage Kafka Module</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;
package io.debezium.storage.kafka.history;
import java.io.IOException;
import java.time.Duration;
@ -58,6 +58,12 @@
import io.debezium.config.Field.Validator;
import io.debezium.document.DocumentReader;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
@ -508,7 +514,7 @@ public String toString() {
return "Kafka topic";
}
protected static String consumerConfigPropertyName(String kafkaConsumerPropertyName) {
public static String consumerConfigPropertyName(String kafkaConsumerPropertyName) {
return CONSUMER_PREFIX + kafkaConsumerPropertyName;
}

View File

@ -0,0 +1,25 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.kafka.history;
import java.util.Objects;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
public class KafkaStorageConfiguration {
public static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, Field.ValidationOutput problems) {
String serverName = config.getString(field);
String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);
if (Objects.equals(serverName, historyTopicName)) {
problems.accept(field, serverName, "Must not have the same value as " + KafkaDatabaseHistory.TOPIC.name());
return 1;
}
return 0;
}
}

19
debezium-storage/pom.xml Normal file
View File

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../debezium-parent/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage</artifactId>
<name>Debezium Storage Module</name>
<packaging>pom</packaging>
<modules>
<module>debezium-storage-kafka</module>
<module>debezium-storage-file</module>
</modules>
</project>

View File

@ -98,7 +98,7 @@ Here's an example of code that configures and runs an embedded xref:{link-mysql-
props.setProperty("database.server.id", "85744");
props.setProperty("database.server.name", "my-app-connector");
props.setProperty("database.history",
"io.debezium.relational.history.FileDatabaseHistory");
"io.debezium.storage.file.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename",
"/path/to/storage/dbhistory.dat");
@ -153,7 +153,7 @@ The next few lines define the fields that are specific to the connector (documen
props.setProperty("database.server.id", "85744")
props.setProperty("database.server.name", "my-app-connector")
props.setProperty("database.history",
"io.debezium.relational.history.FileDatabaseHistory")
"io.debezium.storage.file.history.FileDatabaseHistory")
props.setProperty("database.history.file.filename",
"/path/to/storage/dbhistory.dat")
----

View File

@ -220,12 +220,12 @@ To use Redis to store offsets, use `io.debezium.server.redis.RedisOffsetBackingS
|[[debezium-source-database-history-class]]<<debezium-source-database-history-class, `debezium.source.database.history`>>
|`io.debezium.relational.history.KafkaDatabaseHistory`
|`io.debezium.storage.kafka.history.KafkaDatabaseHistory`
|Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) track the database schema evolution over time and stores this data in a database schema history.
This is by default based on Kafka.
There are also other options available
* `io.debezium.relational.history.FileDatabaseHistory` for non-Kafka deployments
* `io.debezium.storage.file.history.FileDatabaseHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemoryDatabaseHistory` volatile store for test environments
* `io.debezium.server.redis.RedisDatabaseHistory` volatile store for test environments

View File

@ -86,7 +86,7 @@ Configuration config = Configuration.create()
.with("database.server.id", 85744)
.with("database.server.name", "my-app-connector")
.with("database.history",
"io.debezium.relational.history.FileDatabaseHistory")
"io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/path/to/storage/dbhistory.dat")
.build();
@ -138,7 +138,7 @@ The next few lines define the fields that are specific to the connector, which i
.with("database.server.id", 85744)
.with("database.server.name", "products")
.with("database.history",
"io.debezium.relational.history.FileDatabaseHistory")
"io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/path/to/storage/dbhistory.dat")
.build();

View File

@ -144,7 +144,7 @@ log4j.logger.io.debezium.connector.mysql=DEBUG, stdout // <1>
log4j.logger.io.debezium.relational.history=DEBUG, stdout // <2>
log4j.additivity.io.debezium.connector.mysql=false // <3>
log4j.additivity.io.debezium.relational.history=false // <3>
log4j.additivity.io.debezium.storage.kafka.history=false // <3>
...
----
<1> Configures the logger named `io.debezium.connector.mysql` to send `DEBUG`, `INFO`, `WARN`, and `ERROR` messages to the `stdout` appender.
@ -189,7 +189,7 @@ log4j.logger.io.debezium.connector.mysql.BinlogReader=DEBUG, stdout
log4j.logger.io.debezium.relational.history=INFO, stdout
log4j.additivity.io.debezium.connector.mysql=false
log4j.additivity.io.debezium.relational.history=false
log4j.additivity.io.debezium.storage.kafka.history=false
log4j.additivity.io.debezium.connector.mysql.BinlogReader=false
...
----

View File

@ -35,7 +35,7 @@ Further down, you should see output like the following from the connector:
2021-11-30 01:38:44,406 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,406 INFO || Kafka commitId: 8cb0a5e9d3441962 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,407 INFO || Kafka startTimeMs: 1638236324406 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,437 INFO || Database history topic '(name=dbhistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.relational.history.KafkaDatabaseHistory]
2021-11-30 01:38:44,437 INFO || Database history topic '(name=dbhistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.storage.kafka.history.KafkaDatabaseHistory]
2021-11-30 01:38:44,497 INFO || App info kafka.admin.client for dbserver1-dbhistory unregistered [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,499 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]

View File

@ -161,6 +161,7 @@
<module>debezium-testing</module>
<module>debezium-connect-rest-extension</module>
<module>debezium-schema-generator</module>
<module>debezium-storage</module>
</modules>
<distributionManagement>