DBZ-3621 Added debezium-storage module to pom.xml

This commit is contained in:
Kanthi Subramanian 2023-04-05 14:53:09 -04:00 committed by Jiri Pechanec
parent 3860a4faca
commit 8be3806525
3 changed files with 35 additions and 1 deletions

View File

@ -250,6 +250,7 @@ Jun Du
Jure Kajzer
Justin Hiza
Kanha Gupta
Kanthi Subramanian
Katerina Galieva
Kaushik Iyer
Kazuki MATSUDA / 松田一樹

View File

@ -566,6 +566,11 @@
<artifactId>debezium-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-kafka</artifactId>

View File

@ -243,6 +243,28 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
.withDescription(String.format("How long we wait before forcefully stopping the connector thread when shutting down. " +
"Must be bigger than the time it takes two polling loops to finish ({} ms)", ChangeEventSourceCoordinator.SHUTDOWN_WAIT_TIMEOUT.toMillis() * 2));
// JDBC OFFSET STORAGE FIELDS.
public static final Field OFFSET_STORAGE_JDBC_URI = Field.create("offset.storage.jdbc.uri")
.withDescription("URI of the database which will be used to record the database history")
.withDefault("jdbc:")
.withValidation(Field::isRequired);
public static final Field OFFSET_STORAGE_JDBC_USER = Field.create("offset.storage.jdbc.user")
.withDescription("Username of the database which will be used to record the database history")
.withDefault("offsetuser")
.withValidation(Field::isRequired);
public static final Field OFFSET_STORAGE_JDBC_PASSWORD = Field.create("offset.storage.jdbc.password")
.withDescription("Password of the database which will be used to record the database history")
.withDefault("offsetpassword")
.withValidation(Field::isRequired);
public static final String DEFAULT_OFFSET_STORAGE_TABLE_NAME = "debezium_offset_storage";
public static final Field OFFSET_STORAGE_JDBC_TABLE_NAME = Field.create("offset.storage.jdbc.offset_table_name")
.withDescription("Name of the table to store offsets")
.withDefault(DEFAULT_OFFSET_STORAGE_TABLE_NAME);
/**
* The array of fields that are required by each connectors.
*/
@ -253,7 +275,9 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
*/
protected static final Field.Set ALL_FIELDS = CONNECTOR_FIELDS.with(OFFSET_STORAGE, OFFSET_STORAGE_FILE_FILENAME,
OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS,
ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS);
ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS,
// JDBC OFFSET STORAGE FIELDS
OFFSET_STORAGE_JDBC_URI, OFFSET_STORAGE_JDBC_USER, OFFSET_STORAGE_JDBC_PASSWORD, OFFSET_STORAGE_JDBC_TABLE_NAME);
public static final class BuilderImpl implements Builder {
private Configuration config;
@ -1206,6 +1230,10 @@ protected static class EmbeddedConfig extends WorkerConfig {
Field.group(config, "kafka", OFFSET_STORAGE_KAFKA_TOPIC);
Field.group(config, "kafka", OFFSET_STORAGE_KAFKA_PARTITIONS);
Field.group(config, "kafka", OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR);
Field.group(config, "jdbc", OFFSET_STORAGE_JDBC_URI);
Field.group(config, "jdbc", OFFSET_STORAGE_JDBC_USER);
Field.group(config, "jdbc", OFFSET_STORAGE_JDBC_PASSWORD);
Field.group(config, "jdbc", OFFSET_STORAGE_JDBC_TABLE_NAME);
CONFIG = config;
}