DBZ-3986 [oracle] Add the SCHEMA_ONLY_RECOVERY snapshot mode

This commit is contained in:
Zongwen Li 2021-09-21 00:05:20 +08:00 committed by Gunnar Morling
parent e3294146b5
commit 636c09e040
7 changed files with 248 additions and 9 deletions

View File

@ -336,8 +336,8 @@ Yoann Rodière
Yossi Shirizli
Yuan Zhang
Zheng Wang
Zongwen Li
Zoran Regvart
志飞 张
李宗文
민규 김
Ünal Sürmeli

View File

@ -498,4 +498,20 @@ public static String connectionString(Configuration config) {
private static ConnectionFactory resolveConnectionFactory(Configuration config) {
return JdbcConnection.patternBasedFactory(connectionString(config));
}
/**
* Determine whether the Oracle server has the archive log enabled.
*
* @return {@code true} if the server's {@code LOG_MODE} is set to {@code ARCHIVELOG}, or {@code false} otherwise
*/
protected boolean isArchiveLogMode() {
try {
final String mode = queryAndMap("SELECT LOG_MODE FROM V$DATABASE", rs -> rs.next() ? rs.getString(1) : "");
LOGGER.debug("LOG_MODE={}", mode);
return "ARCHIVELOG".equalsIgnoreCase(mode);
}
catch (SQLException e) {
throw new DebeziumException("Unexpected error while connecting to Oracle and looking at LOG_MODE mode: ", e);
}
}
}

View File

@ -500,19 +500,29 @@ public enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
INITIAL("initial", true),
INITIAL("initial", true, false),
/**
* Perform a snapshot of the schema but no data upon initial startup of a connector.
*/
SCHEMA_ONLY("schema_only", false);
SCHEMA_ONLY("schema_only", false, false),
/**
* Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position.
* This can be used for recovery only if the connector has existing offsets and the database.history.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/
SCHEMA_ONLY_RECOVERY("schema_only_recovery", false, true);
private final String value;
private final boolean includeData;
private final boolean shouldSnapshotOnSchemaError;
private SnapshotMode(String value, boolean includeData) {
private SnapshotMode(String value, boolean includeData, boolean shouldSnapshotOnSchemaError) {
this.value = value;
this.includeData = includeData;
this.shouldSnapshotOnSchemaError = shouldSnapshotOnSchemaError;
}
@Override
@ -528,6 +538,13 @@ public boolean includeData() {
return includeData;
}
/**
* Whether the schema can be recovered if database history is corrupted.
*/
public boolean shouldSnapshotOnSchemaError() {
return shouldSnapshotOnSchemaError;
}
/**
* Determine if the supplied value is one of the predefined options.
*

View File

@ -13,6 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
@ -53,18 +54,19 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
Configuration jdbcConfig = connectorConfig.getJdbcConfig();
jdbcConnection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader());
validateRedoLogConfiguration();
OracleValueConverters valueConverters = new OracleValueConverters(connectorConfig, jdbcConnection);
TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection);
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity);
this.schema.initializeStorage();
Offsets<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig),
connectorConfig.getAdapter().getOffsetContextLoader());
OraclePartition partition = previousOffsets.getTheOnlyPartition();
OracleOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
if (previousOffset != null) {
schema.recover(partition, previousOffset);
}
validateAndLoadDatabaseHistory(connectorConfig, partition, previousOffset, schema);
taskContext = new OracleTaskContext(connectorConfig, schema);
@ -139,4 +141,42 @@ public void doStop() {
protected Iterable<Field> getAllConfigurationFields() {
return OracleConnectorConfig.ALL_FIELDS;
}
private void validateRedoLogConfiguration() {
// Check whether the archive log is enabled.
final boolean archivelogMode = jdbcConnection.isArchiveLogMode();
if (!archivelogMode) {
throw new DebeziumException("The Oracle server is not configured to use a archive log LOG_MODE, which is "
+ "required for this connector to work properly. Change the Oracle configuration to use a "
+ "LOG_MODE=ARCHIVELOG and restart the connector.");
}
}
private void validateAndLoadDatabaseHistory(OracleConnectorConfig config, OraclePartition partition, OracleOffsetContext offset, OracleDatabaseSchema schema) {
if (offset == null) {
if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
// We are in schema only recovery mode, use the existing redo log position
// would like to also verify redo log position exists, but it defaults to 0 which is technically valid
throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot");
}
LOGGER.info("Connector started for the first time, database history recovery will not be executed");
schema.initializeStorage();
return;
}
if (!schema.historyExists()) {
LOGGER.warn("Database history was not found but was expected");
if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. " +
"Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.",
OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
else {
throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to "
+ OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
schema.initializeStorage();
return;
}
schema.recover(partition, offset);
}
}

View File

@ -30,6 +30,7 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private final OracleDdlParser ddlParser;
private final OracleValueConverters valueConverters;
private boolean storageInitializationExecuted = false;
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters,
SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector,
@ -85,4 +86,21 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
record(schemaChange, schemaChange.getTableChanges());
}
}
@Override
public void initializeStorage() {
super.initializeStorage();
storageInitializationExecuted = true;
}
public boolean isStorageInitializationExecuted() {
return storageInitializationExecuted;
}
/**
* Return true if the database history entity exists
*/
public boolean historyExists() {
return databaseHistory.exists();
}
}

View File

@ -40,6 +40,7 @@ public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEve
private final OracleConnectorConfig connectorConfig;
private final OracleConnection jdbcConnection;
private final OracleDatabaseSchema databaseSchema;
public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection,
OracleDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock,
@ -48,6 +49,7 @@ public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, Or
this.connectorConfig = connectorConfig;
this.jdbcConnection = jdbcConnection;
this.databaseSchema = schema;
}
@Override
@ -57,13 +59,22 @@ protected SnapshottingTask getSnapshottingTask(OracleOffsetContext previousOffse
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
snapshotSchema = false;
LOGGER.info("The previous offset has been found.");
snapshotSchema = databaseSchema.isStorageInitializationExecuted();
snapshotData = false;
}
else {
LOGGER.info("No previous offset has been found.");
snapshotData = connectorConfig.getSnapshotMode().includeData();
}
if (snapshotData && snapshotSchema) {
LOGGER.info("According to the connector configuration both schema and data will be snapshot.");
}
else if (snapshotSchema) {
LOGGER.info("According to the connector configuration only schema will be snapshot.");
}
return new SnapshottingTask(snapshotSchema, snapshotData);
}
@ -117,6 +128,14 @@ protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext<OraclePartit
protected void determineSnapshotOffset(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx,
OracleOffsetContext previousOffset)
throws Exception {
// Support the existence of the case when the previous offset.
// e.g., schema_only_recovery snapshot mode
if (previousOffset != null) {
ctx.offset = previousOffset;
tryStartingSnapshot(ctx);
return;
}
Optional<Scn> latestTableDdlScn = getLatestTableDdlScn(ctx);
Scn currentScn;

View File

@ -17,6 +17,7 @@
import java.lang.management.ManagementFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
@ -38,6 +39,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.awaitility.core.ConditionTimeoutException;
@ -48,6 +51,7 @@
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningStrategy;
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
@ -65,9 +69,12 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.MemoryDatabaseHistory;
import io.debezium.util.Testing;
/**
@ -2318,4 +2325,126 @@ private void verifyHeartbeatRecord(SourceRecord heartbeat) {
private long toMicroSecondsSinceEpoch(LocalDateTime localDateTime) {
return localDateTime.toEpochSecond(ZoneOffset.UTC) * MICROS_PER_SECOND;
}
@Test(expected = DebeziumException.class)
@FixFor("DBZ-3986")
public void shouldCreateSnapshotSchemaOnlyRecoveryExceptionWithoutOffset() {
final Path path = Testing.Files.createTestingPath("missing-history.txt").toAbsolutePath();
Configuration config = defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY)
.with(FileDatabaseHistory.FILE_PATH, path)
.build();
// Start the connector ...
AtomicReference<Throwable> exception = new AtomicReference<>();
start(OracleConnector.class, config, (success, message, error) -> exception.set(error));
Testing.Files.delete(path);
throw (RuntimeException) exception.get();
}
@Test
@FixFor("DBZ-3986")
public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
try {
Configuration.Builder builder = defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")
.with(OracleConnectorConfig.DATABASE_HISTORY, MemoryDatabaseHistory.class.getName())
.with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
Configuration config = builder.build();
consumeRecords(config);
// Insert a row of data in advance
connection.execute("INSERT INTO DBZ3986 (ID, DATA) values (3, 'asuka')");
builder.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY_RECOVERY);
config = builder.build();
start(OracleConnector.class, config);
int recordCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(recordCount);
// Compare data
assertThat(sourceRecords.allRecordsInOrder()).hasSize(recordCount);
Struct struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(AFTER);
assertEquals(3, struct.get("ID"));
assertEquals("asuka", struct.get("DATA"));
}
finally {
TestHelper.dropTable(connection, "DBZ3986");
}
}
@Test(expected = DebeziumException.class)
@FixFor("DBZ-3986")
public void shouldCreateSnapshotSchemaOnlyExceptionWithoutHistory() throws Exception {
try {
Configuration.Builder builder = defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")
.with(OracleConnectorConfig.DATABASE_HISTORY, MemoryDatabaseHistory.class.getName())
.with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
Configuration config = builder.build();
consumeRecords(config);
AtomicReference<Throwable> exception = new AtomicReference<>();
start(OracleConnector.class, config, (success, message, error) -> exception.set(error));
throw (RuntimeException) exception.get();
}
finally {
TestHelper.dropTable(connection, "DBZ3986");
}
}
@Test
@FixFor("DBZ-3986")
public void shouldSkipDataOnSnapshotSchemaOnly() throws Exception {
try {
Configuration.Builder builder = defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")
.with(OracleConnectorConfig.DATABASE_HISTORY, MemoryDatabaseHistory.class.getName())
.with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class.getName());
Configuration config = builder.build();
consumeRecords(config);
// Insert a row of data in advance. And should skip the data
connection.execute("INSERT INTO DBZ3986 (ID, DATA) values (3, 'asuka')");
start(OracleConnector.class, config);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO DBZ3986 (ID, DATA) values (4, 'debezium')");
int recordCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(recordCount);
// Compare data
assertThat(sourceRecords.allRecordsInOrder()).hasSize(recordCount);
Struct struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(AFTER);
assertEquals(4, struct.get("ID"));
assertEquals("debezium", struct.get("DATA"));
}
finally {
TestHelper.dropTable(connection, "DBZ3986");
}
}
@FixFor("DBZ-3986")
private void consumeRecords(Configuration config) throws SQLException, InterruptedException {
// Poll for records ...
TestHelper.dropTable(connection, "DBZ3986");
connection.execute("CREATE TABLE DBZ3986 (ID number(9,0), DATA varchar2(50))");
TestHelper.streamTable(connection, "DBZ3986");
// Start the connector ...
start(OracleConnector.class, config);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO DBZ3986 (ID, DATA) values (1, 'Test')");
connection.execute("INSERT INTO DBZ3986 (ID, DATA) values (2, 'ashlin')");
int recordCount = 2;
SourceRecords sourceRecords = consumeRecordsByTopic(recordCount);
assertThat(sourceRecords.allRecordsInOrder()).hasSize(recordCount);
stopConnector();
}
}