DBZ-7461 Rename shouldSnapshot to shouldSnapshotData
This commit is contained in:
parent
562c999da4
commit
211675a355
@ -38,13 +38,15 @@ public interface Snapshotter extends Configurable {
|
||||
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
|
||||
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
|
||||
*
|
||||
* @return {@code true} if the snapshotter should take a snapshot
|
||||
* @return {@code true} if the snapshotter should take a data snapshot
|
||||
*/
|
||||
boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress);
|
||||
boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);
|
||||
|
||||
/**
|
||||
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
|
||||
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
|
||||
*
|
||||
* @return {@code true} if the snapshotter should take a schema snapshot
|
||||
*/
|
||||
boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);
|
||||
|
||||
|
@ -256,7 +256,7 @@ protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
|
||||
private boolean validateSnapshotFeasibility(Snapshotter snapshotter, OffsetContext offset) {
|
||||
|
||||
if (offset == null) {
|
||||
if (!snapshotter.shouldSnapshot(false, false)) {
|
||||
if (!snapshotter.shouldSnapshotData(false, false)) {
|
||||
// Look to see what the first available binlog file is called, and whether it looks like binlog files have
|
||||
// been purged. If so, then output a warning ...
|
||||
String earliestBinlogFilename = connection.earliestBinlogFilename();
|
||||
|
@ -118,7 +118,7 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse
|
||||
}
|
||||
|
||||
boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress);
|
||||
boolean shouldSnapshotData = snapshotter.shouldSnapshot(offsetExists, snapshotInProgress);
|
||||
boolean shouldSnapshotData = snapshotter.shouldSnapshotData(offsetExists, snapshotInProgress);
|
||||
|
||||
if (shouldSnapshotSchema && shouldSnapshotData) {
|
||||
LOGGER.info("According to the connector configuration both schema and data will be snapshotted");
|
||||
|
@ -39,7 +39,7 @@ public void injectBeanRegistry(BeanRegistry beanRegistry) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ public SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOff
|
||||
}
|
||||
|
||||
boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress);
|
||||
boolean shouldSnapshotData = snapshotter.shouldSnapshot(offsetExists, snapshotInProgress);
|
||||
boolean shouldSnapshotData = snapshotter.shouldSnapshotData(offsetExists, snapshotInProgress);
|
||||
|
||||
if (shouldSnapshotData && shouldSnapshotSchema) {
|
||||
LOGGER.info("According to the connector configuration both schema and data will be snapshot.");
|
||||
|
@ -39,7 +39,7 @@ public void injectBeanRegistry(BeanRegistry beanRegistry) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,7 @@ public SnapshottingTask getSnapshottingTask(PostgresPartition partition, Postgre
|
||||
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
|
||||
}
|
||||
|
||||
boolean snapshotData = snapshotterService.getSnapshotter().shouldSnapshot(offsetExists, snapshotInProgress);
|
||||
boolean snapshotData = snapshotterService.getSnapshotter().shouldSnapshotData(offsetExists, snapshotInProgress);
|
||||
if (snapshotData) {
|
||||
LOGGER.info("According to the connector configuration data will be snapshotted");
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ public boolean shouldStream() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -101,7 +101,6 @@
|
||||
import io.debezium.relational.RelationalSnapshotChangeEventSource;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.schema.DatabaseSchema;
|
||||
import io.debezium.snapshot.mode.InitialOnlySnapshotter;
|
||||
import io.debezium.util.Strings;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
@ -1863,7 +1862,7 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exc
|
||||
@FixFor("DBZ-1437")
|
||||
public void shouldPerformSnapshotOnceForInitialOnlySnapshotMode() throws Exception {
|
||||
// This captures all logged messages, allowing us to verify log message was written.
|
||||
final LogInterceptor logInterceptor = new LogInterceptor(InitialOnlySnapshotter.class);
|
||||
final LogInterceptor logInterceptor = new LogInterceptor(PostgresSnapshotChangeEventSource.class);
|
||||
|
||||
TestHelper.dropDefaultReplicationSlot();
|
||||
|
||||
@ -1909,7 +1908,8 @@ public void shouldPerformSnapshotOnceForInitialOnlySnapshotMode() throws Excepti
|
||||
waitForConnectorShutdown("postgres", TestHelper.TEST_SERVER);
|
||||
|
||||
// Stop the connector, verify that no snapshot was performed
|
||||
assertThat(logInterceptor.containsMessage("Previous initial snapshot completed, no snapshot will be performed")).isTrue();
|
||||
assertThat(logInterceptor.containsMessage("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."))
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -41,7 +41,7 @@ public void injectBeanRegistry(BeanRegistry beanRegistry) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,7 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcCo
|
||||
|
||||
if (offset.isSnapshotRunning()) {
|
||||
// The last offset was an incomplete snapshot and now the snapshot was disabled
|
||||
if (!snapshotter.shouldSnapshot(true, true)) {
|
||||
if (!snapshotter.shouldSnapshotData(true, true)) {
|
||||
// No snapshots are allowed
|
||||
throw new DebeziumException("The connector previously stopped while taking a snapshot, but now the connector is configured "
|
||||
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
|
||||
|
@ -10,9 +10,6 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.bean.StandardBeanNames;
|
||||
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
|
||||
import io.debezium.schema.DatabaseSchema;
|
||||
import io.debezium.spi.snapshot.Snapshotter;
|
||||
|
||||
public class AlwaysSnapshotter extends BeanAwareSnapshotter implements Snapshotter {
|
||||
@ -30,7 +27,7 @@ public void configure(Map<String, ?> properties) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
// for ALWAYS snapshot mode don't use exiting offset to have up-to-date SCN
|
||||
LOGGER.info("Snapshot mode is set to ALWAYS, not checking exiting offset.");
|
||||
return true;
|
||||
@ -39,17 +36,6 @@ public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress)
|
||||
@Override
|
||||
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
|
||||
|
||||
final DatabaseSchema databaseSchema = beanRegistry.lookupByName(StandardBeanNames.DATABASE_SCHEMA, DatabaseSchema.class);
|
||||
|
||||
if (!databaseSchema.isHistorized()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final HistorizedRelationalDatabaseSchema historizedRelationalDatabaseSchema = (HistorizedRelationalDatabaseSchema) databaseSchema;
|
||||
if (offsetExists && !snapshotInProgress) {
|
||||
return historizedRelationalDatabaseSchema.isStorageInitializationExecuted();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,7 @@ public boolean shouldStream() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
|
||||
return !offsetExists || snapshotInProgress;
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ public void configure(Map<String, ?> properties) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ public void configure(Map<String, ?> properties) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ public void configure(Map<String, ?> properties) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
|
||||
public boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress) {
|
||||
|
||||
return !offsetExists || snapshotInProgress;
|
||||
}
|
||||
|
@ -395,7 +395,7 @@ private void startSourceTasks(final List<EngineSourceTask> tasks) throws Excepti
|
||||
}
|
||||
|
||||
final long taskStartupTimeout = config.getLong(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS);
|
||||
LOGGER.debug("Waiting max. for {} ms for individual source tasks to start.", taskStartupTimeout);
|
||||
LOGGER.info("Waiting max. for {} ms for individual source tasks to start.", taskStartupTimeout);
|
||||
final int nTasks = tasks.size();
|
||||
Exception error = null;
|
||||
int failedTasks = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user