DBZ-7218 Log warnings if log destination misconfigured

This commit is contained in:
Chris Cranford 2023-12-07 05:52:27 -05:00 committed by Chris Cranford
parent b9bb04c250
commit d27154dc60
2 changed files with 40 additions and 1 deletions

View File

@ -505,6 +505,29 @@ public Scn getScnAdjustedByTime(Scn scn, Duration adjustment) throws SQLExceptio
}
}
public boolean isArchiveLogDestinationValid(String archiveDestinationName) throws SQLException {
return prepareQueryAndMap("SELECT STATUS, TYPE FROM V$ARCHIVE_DEST_STATUS WHERE DEST_NAME=?",
st -> st.setString(1, archiveDestinationName),
rs -> {
if (!rs.next()) {
throw new DebeziumException(
String.format("Archive log destination name '%s' is unknown to Oracle",
archiveDestinationName));
}
return "VALID".equals(rs.getString("STATUS")) && "LOCAL".equals(rs.getString("TYPE"));
});
}
public boolean isOnlyOneArchiveLogDestinationValid() throws SQLException {
return queryAndMap("SELECT COUNT(1) FROM V$ARCHIVE_DEST_STATUS WHERE STATUS='VALID' AND TYPE='LOCAL'",
rs -> {
if (!rs.next()) {
throw new DebeziumException("Unable to resolve number of archive log destinations");
}
return rs.getLong(1) == 1L;
});
}
@Override
protected ColumnEditor overrideColumn(ColumnEditor column) {
// This allows the column state to be overridden before default-value resolution so that the

View File

@ -53,6 +53,7 @@
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;
/**
* A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility.
@ -167,7 +168,7 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
}
checkDatabaseAndTableState(jdbcConnection, connectorConfig.getPdbName(), schema);
checkArchiveLogDestination(jdbcConnection, connectorConfig.getLogMiningArchiveDestinationName());
logOnlineRedoLogSizes(connectorConfig);
try (LogMinerEventProcessor processor = createProcessor(context, partition, offsetContext)) {
@ -921,6 +922,21 @@ else if (!isTableAllColumnsSupplementalLoggingEnabled(connection, tableId)) {
LOGGER.trace("Database and table state check finished after {} ms", Duration.between(start, Instant.now()).toMillis());
}
private void checkArchiveLogDestination(OracleConnection connection, String destinationName) throws SQLException {
if (!Strings.isNullOrBlank(destinationName)) {
if (!connection.isArchiveLogDestinationValid(destinationName)) {
LOGGER.warn("Archive log destination '{}' may not be valid, please check the database.", destinationName);
}
}
else {
if (!connection.isOnlyOneArchiveLogDestinationValid()) {
LOGGER.warn("There are multiple valid archive log destinations. " +
"Please add '{}' to the connector configuration to avoid log availability problems.",
OracleConnectorConfig.LOG_MINING_ARCHIVE_DESTINATION_NAME.name());
}
}
}
/**
* Examines the table and column names and logs a warning if any name exceeds {@link #MAXIMUM_NAME_LENGTH}.
*