DBZ-7497 Add a configuration based snapshot modes configurable via connector properties

This commit is contained in:
mfvitale 2024-03-26 11:41:05 +01:00 committed by Jiri Pechanec
parent 9bc20c9ea3
commit ee5f25ccc9
11 changed files with 271 additions and 8 deletions

View File

@ -98,6 +98,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
WHEN_NEEDED("when_needed"),
/**
* Allows over snapshots by setting connectors properties prefixed with 'snapshot.mode.configuration.based'.
*/
CONFIGURATION_BASED("configuration_based"),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/

View File

@ -194,6 +194,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL_ONLY("initial_only"),
/**
* Allows over snapshots by setting connectors properties prefixed with 'snapshot.mode.configuration.based'.
*/
CONFIGURATION_BASED("configuration_based"),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/

View File

@ -1039,6 +1039,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
WHEN_NEEDED("when_needed"),
/**
* Allows over snapshots by setting connectors properties prefixed with 'snapshot.mode.configuration.based'.
*/
CONFIGURATION_BASED("configuration_based"),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/

View File

@ -207,6 +207,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
WHEN_NEEDED("when_needed"),
/**
* Allows over snapshots by setting connectors properties prefixed with 'snapshot.mode.configuration.based'.
*/
CONFIGURATION_BASED("configuration_based"),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/

View File

@ -27,6 +27,8 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
public class CustomSnapshotterIT extends AbstractConnectorTest {
@ -39,7 +41,6 @@ public class CustomSnapshotterIT extends AbstractConnectorTest {
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));";
private static final String SETUP_TABLES_STMT = CREATE_TABLES_STMT + INSERT_STMT;
private PostgresConnector connector;
@Rule
public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();
@ -64,6 +65,7 @@ public void after() {
@Test
@FixFor("DBZ-1082")
public void shouldAllowForCustomSnapshot() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue())
@ -118,4 +120,65 @@ record = s2recs.get(0);
VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2);
}
@Test
public void shouldAllowStreamOnlyByConfigurationBasedSnapshot() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CONFIGURATION_BASED)
.with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA, false)
.with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, false)
.with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_STREAM, true)
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
assertNoRecordsToConsume();
TestHelper.execute(INSERT_STMT);
SourceRecords actualRecords = consumeRecordsByTopic(2);
List<SourceRecord> s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
List<SourceRecord> s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs.size()).isEqualTo(1);
SourceRecord record = s1recs.get(0);
VerifyRecord.isValidInsert(record, PK_FIELD, 2);
record = s2recs.get(0);
VerifyRecord.isValidInsert(record, PK_FIELD, 2);
stopConnector();
}
@Test
public void shouldNotAllowStreamByConfigurationBasedSnapshot() {
LogInterceptor logInterceptor = new LogInterceptor(ChangeEventSourceCoordinator.class);
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CONFIGURATION_BASED)
.with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA, false)
.with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, false)
.with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_STREAM, false)
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
assertNoRecordsToConsume();
TestHelper.execute(INSERT_STMT);
assertNoRecordsToConsume();
waitForConnectorShutdown("postgres", TestHelper.TEST_SERVER);
assertThat(logInterceptor.containsMessage("Streaming is disabled for snapshot mode configuration_based")).isTrue();
stopConnector();
}
}

View File

@ -96,6 +96,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
WHEN_NEEDED("when_needed"),
/**
* Allows over snapshots by setting connectors properties prefixed with 'snapshot.mode.configuration.based'.
*/
CONFIGURATION_BASED("configuration_based"),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/

View File

@ -74,6 +74,11 @@ public abstract class CommonConnectorConfig {
protected SnapshotQueryMode snapshotQueryMode;
protected String snapshotQueryModeCustomName;
protected String snapshotLockingModeCustomName;
protected final boolean snapshotModeConfigurationBasedSnapshotData;
protected final boolean snapshotModeConfigurationBasedSnapshotSchema;
protected final boolean snapshotModeConfigurationBasedStream;
protected final boolean snapshotModeConfigurationBasedSnapshotOnSchemaError;
protected final boolean snapshotModeConfigurationBasedSnapshotOnDataError;
/**
* The set of predefined versions e.g. for source struct maker version
@ -983,6 +988,88 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
"When 'snapshot.query.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. "
+ "The implementations must implement the 'SnapshotterQuery' interface and is called to determine how to build queries during snapshot.");
public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA = Field.create("snapshot.mode.configuration.based.snapshot.data")
.withDisplayName("Snapshot mode property based snapshot data")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 17))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if ("configuration_based".equalsIgnoreCase(config.getString(SNAPSHOT_MODE_PROPERTY_NAME)) && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.mode.configuration.based.snapshot.data cannot be empty when snapshot.mode 'configuration_based' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.mode' is set as configuration_based, this setting must be set to specify whenever the data should be snapshotted or not.");
public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA = Field.create("snapshot.mode.configuration.based.snapshot.schema")
.withDisplayName("Snapshot mode property based snapshot schema")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 18))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if ("configuration_based".equalsIgnoreCase(config.getString(SNAPSHOT_MODE_PROPERTY_NAME)) && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.mode.configuration.based.snapshot.schema cannot be empty when snapshot.mode 'configuration_based' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.mode' is set as configuration_based, this setting must be set to specify whenever the schema should be snapshotted or not.");
public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_STREAM = Field.create("snapshot.mode.configuration.based.start.stream")
.withDisplayName("Snapshot mode property based start stream")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 19))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if ("configuration_based".equalsIgnoreCase(config.getString(SNAPSHOT_MODE_PROPERTY_NAME)) && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.mode.configuration.based.start.stream cannot be empty when snapshot.mode 'configuration_based' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.mode' is set as configuration_based, this setting must be set to specify whenever the stream should start or not after snapshot.");
public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR = Field.create("snapshot.mode.configuration.based.snapshot.on.schema.error")
.withDisplayName("Snapshot mode property based snapshot on schema error")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 20))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if ("configuration_based".equalsIgnoreCase(config.getString(SNAPSHOT_MODE_PROPERTY_NAME)) && config.getString(field, "").isEmpty()) {
output.accept(field, "",
"snapshot.mode.configuration.based.snapshot.on.schema.error cannot be empty when snapshot.mode 'configuration_based' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.mode' is set as configuration_based, this setting must be set to specify whenever the schema should be snapshotted or not in case of error.");
public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR = Field.create("snapshot.mode.configuration.based.snapshot.on.data.error")
.withDisplayName("Snapshot mode property based snapshot on data error")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 21))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if ("configuration_based".equalsIgnoreCase(config.getString(SNAPSHOT_MODE_PROPERTY_NAME)) && config.getString(field, "").isEmpty()) {
output.accept(field, "",
"snapshot.mode.configuration.based.snapshot.on.data.error cannot be empty when snapshot.mode 'configuration_based' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.mode' is set as configuration_based, this setting must be set to specify whenever the data should be snapshotted or not in case of error.");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
EVENT_PROCESSING_FAILURE_HANDLING_MODE,
@ -1103,6 +1190,11 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.snapshotLockingModeCustomName = config.getString(SNAPSHOT_LOCKING_MODE_CUSTOM_NAME, "");
this.snapshotQueryMode = SnapshotQueryMode.parse(config.getString(SNAPSHOT_QUERY_MODE), SNAPSHOT_QUERY_MODE.defaultValueAsString());
this.snapshotQueryModeCustomName = config.getString(SNAPSHOT_QUERY_MODE_CUSTOM_NAME, "");
this.snapshotModeConfigurationBasedSnapshotData = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA, false);
this.snapshotModeConfigurationBasedSnapshotSchema = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, false);
this.snapshotModeConfigurationBasedStream = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_STREAM, false);
this.snapshotModeConfigurationBasedSnapshotOnSchemaError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR, false);
this.snapshotModeConfigurationBasedSnapshotOnDataError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR, false);
this.signalingDataCollectionId = !Strings.isNullOrBlank(this.signalingDataCollection)
? TableId.parse(this.signalingDataCollection)
@ -1438,6 +1530,26 @@ public String snapshotLockingModeCustomName() {
return this.snapshotLockingModeCustomName;
}
public boolean snapshotModeConfigurationBasedSnapshotData() {
return this.snapshotModeConfigurationBasedSnapshotData;
}
public boolean snapshotModeConfigurationBasedSnapshotSchema() {
return this.snapshotModeConfigurationBasedSnapshotSchema;
}
public boolean snapshotModeConfigurationBasedStream() {
return this.snapshotModeConfigurationBasedStream;
}
public boolean snapshotModeConfigurationBasedSnapshotOnSchemaError() {
return this.snapshotModeConfigurationBasedSnapshotOnSchemaError;
}
public boolean snapshotModeConfigurationBasedSnapshotOnDataError() {
return this.snapshotModeConfigurationBasedSnapshotOnDataError;
}
public SnapshotQueryMode snapshotQueryMode() {
return this.snapshotQueryMode;
}

View File

@ -65,13 +65,14 @@ public SnapshotterService createService(Configuration configuration, ServiceRegi
return getSnapshotterService(configuration, snapshotter, beanRegistry, snapshotQueryService, snapshotLockService);
}
private static SnapshotterService getSnapshotterService(Configuration configuration, Snapshotter s, BeanRegistry beanRegistry, SnapshotQuery snapshotQueryService,
private static SnapshotterService getSnapshotterService(Configuration configuration, Snapshotter snapshotter, BeanRegistry beanRegistry,
SnapshotQuery snapshotQueryService,
SnapshotLock snapshotLockService) {
s.configure(configuration.asMap());
if (s instanceof BeanRegistryAware) {
((BeanRegistryAware) s).injectBeanRegistry(beanRegistry);
if (snapshotter instanceof BeanRegistryAware) {
((BeanRegistryAware) snapshotter).injectBeanRegistry(beanRegistry);
}
return new SnapshotterService(s, snapshotQueryService, snapshotLockService);
snapshotter.configure(configuration.asMap());
return new SnapshotterService(snapshotter, snapshotQueryService, snapshotLockService);
}
@Override

View File

@ -28,7 +28,7 @@ public void configure(Map<String, ?> properties) {
@Override
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
// for ALWAYS snapshot mode don't use exiting offset to have up-to-date SCN
LOGGER.info("Snapshot mode is set to ALWAYS, not checking exiting offset.");
return true;
}

View File

@ -0,0 +1,61 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.snapshot.mode;
import java.util.Map;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.spi.snapshot.Snapshotter;
public class ConfigurationBasedSnapshotter extends BeanAwareSnapshotter implements Snapshotter {
private boolean snapshotData;
private boolean snapshotSchema;
private boolean stream;
private boolean snapshotOnSchemaError;
private boolean snapshotOnDataError;
@Override
public String name() {
return "configuration_based";
}
@Override
public void configure(Map<String, ?> properties) {
CommonConnectorConfig commonConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
this.snapshotData = commonConnectorConfig.snapshotModeConfigurationBasedSnapshotData();
this.snapshotSchema = commonConnectorConfig.snapshotModeConfigurationBasedSnapshotSchema();
this.stream = commonConnectorConfig.snapshotModeConfigurationBasedStream();
this.snapshotOnSchemaError = commonConnectorConfig.snapshotModeConfigurationBasedSnapshotOnSchemaError();
this.snapshotOnDataError = commonConnectorConfig.snapshotModeConfigurationBasedSnapshotOnDataError();
}
@Override
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
return this.snapshotData;
}
@Override
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return this.snapshotSchema;
}
@Override
public boolean shouldStream() {
return this.stream;
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return this.snapshotOnSchemaError;
}
@Override
public boolean shouldSnapshotOnDataError() {
return this.snapshotOnDataError;
}
}

View File

@ -5,4 +5,5 @@ io.debezium.snapshot.mode.NoDataSnapshotter
io.debezium.snapshot.mode.RecoverySnapshotter
io.debezium.snapshot.mode.WhenNeededSnapshotter
io.debezium.snapshot.mode.NeverSnapshotter
io.debezium.snapshot.mode.SchemaOnlySnapshotter
io.debezium.snapshot.mode.SchemaOnlySnapshotter
io.debezium.snapshot.mode.ConfigurationBasedSnapshotter