DBZ-6276 changed Oracle archive log check to fail if conditions require it
This commit is contained in:
parent
2181251a9f
commit
4df3895d65
@ -61,7 +61,7 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
|
||||
() -> new OracleConnection(jdbcConfig));
|
||||
jdbcConnection = connectionFactory.mainConnection();
|
||||
|
||||
validateRedoLogConfiguration();
|
||||
validateRedoLogConfiguration(connectorConfig);
|
||||
|
||||
OracleValueConverters valueConverters = new OracleValueConverters(connectorConfig, jdbcConnection);
|
||||
OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(valueConverters, jdbcConnection);
|
||||
@ -171,16 +171,27 @@ protected Iterable<Field> getAllConfigurationFields() {
|
||||
return OracleConnectorConfig.ALL_FIELDS;
|
||||
}
|
||||
|
||||
private void validateRedoLogConfiguration() {
|
||||
private void validateRedoLogConfiguration(OracleConnectorConfig config) {
|
||||
// Check whether the archive log is enabled.
|
||||
final boolean archivelogMode = jdbcConnection.isArchiveLogMode();
|
||||
if (!archivelogMode) {
|
||||
throw new DebeziumException("The Oracle server is not configured to use a archive log LOG_MODE, which is "
|
||||
+ "required for this connector to work properly. Change the Oracle configuration to use a "
|
||||
+ "LOG_MODE=ARCHIVELOG and restart the connector.");
|
||||
if (redoLogRequired(config)) {
|
||||
throw new DebeziumException("The Oracle server is not configured to use a archive log LOG_MODE, which is "
|
||||
+ "required for this connector to work properly. Change the Oracle configuration to use a "
|
||||
+ "LOG_MODE=ARCHIVELOG and restart the connector.");
|
||||
}
|
||||
else {
|
||||
LOGGER.warn("Failed the archive log check but continuing as redo log isn't strictly required");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean redoLogRequired(OracleConnectorConfig config) {
|
||||
// Check whether our connector configuration relies on the redo log and should fail fast if it isn't configured
|
||||
return config.getSnapshotMode().shouldStream() ||
|
||||
config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL;
|
||||
}
|
||||
|
||||
private void validateAndLoadSchemaHistory(OracleConnectorConfig config, OraclePartition partition, OracleOffsetContext offset, OracleDatabaseSchema schema) {
|
||||
if (offset == null) {
|
||||
if (config.getSnapshotMode().shouldSnapshotOnSchemaError() && config.getSnapshotMode() != OracleConnectorConfig.SnapshotMode.ALWAYS) {
|
||||
|
@ -308,6 +308,23 @@ public void shouldTakeSnapshot() throws Exception {
|
||||
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("last");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldSkipCheckingArchiveLogIfNoCdc() throws Exception {
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)
|
||||
.with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, TransactionSnapshotBoundaryMode.SKIP)
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")
|
||||
.build();
|
||||
|
||||
LogInterceptor logInterceptor = new LogInterceptor(OracleConnectorTask.class);
|
||||
|
||||
start(OracleConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
stopConnector();
|
||||
|
||||
assertThat(logInterceptor.containsWarnMessage("Failed the archive log check but continuing as redo log isn't strictly required")).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
|
Loading…
Reference in New Issue
Block a user