diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 1c41dc28b..723e1bd0f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -202,6 +202,15 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withDescription("Complete JDBC URL as an alternative to specifying hostname, port and database provided " + "as a way to support alternative connection scenarios."); + public static final Field LOG_MINING_DML_PARSER = Field.create("log.mining.dml.parser") + .withDisplayName("Log Mining DML parser implementation") + .withEnum(LogMiningDmlParser.class, LogMiningDmlParser.FAST) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("The parser implementation to use when parsing DML operations:" + + "'legacy': the legacy parser implementation based on JSqlParser; " + + "'fast': the robust parser implementation that is streamlined specifically for LogMiner redo format"); + public static final Field LOG_MINING_ARCHIVE_LOG_HOURS = Field.create("log.mining.archive.log.hours") .withDisplayName("Log Mining Archive Log Hours") .withType(Type.LONG) @@ -326,7 +335,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector LOG_MINING_SLEEP_TIME_DEFAULT_MS, LOG_MINING_SLEEP_TIME_MIN_MS, LOG_MINING_SLEEP_TIME_MAX_MS, - LOG_MINING_SLEEP_TIME_INCREMENT_MS); + LOG_MINING_SLEEP_TIME_INCREMENT_MS, + LOG_MINING_DML_PARSER); private final String databaseName; private final String pdbName; @@ -402,7 +412,7 @@ public static ConfigDef configDef() { SNAPSHOT_ENHANCEMENT_TOKEN, LOG_MINING_HISTORY_RECORDER_CLASS, LOG_MINING_HISTORY_RETENTION, RAC_SYSTEM, RAC_NODES, LOG_MINING_ARCHIVE_LOG_HOURS, LOG_MINING_BATCH_SIZE_DEFAULT, LOG_MINING_BATCH_SIZE_MIN, LOG_MINING_BATCH_SIZE_MAX, LOG_MINING_SLEEP_TIME_DEFAULT_MS, LOG_MINING_SLEEP_TIME_MIN_MS, LOG_MINING_SLEEP_TIME_MAX_MS, LOG_MINING_SLEEP_TIME_INCREMENT_MS, - LOG_MINING_TRANSACTION_RETENTION); + LOG_MINING_TRANSACTION_RETENTION, LOG_MINING_DML_PARSER); return config; } @@ -671,6 +681,43 @@ public static LogMiningStrategy parse(String value, String defaultValue) { } } + public enum LogMiningDmlParser implements EnumeratedValue { + LEGACY("legacy"), + FAST("fast"); + + private final String value; + + LogMiningDmlParser(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + public static LogMiningDmlParser parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (LogMiningDmlParser parser : LogMiningDmlParser.values()) { + if (parser.getValue().equalsIgnoreCase(value)) { + return parser; + } + } + return null; + } + + public static LogMiningDmlParser parse(String value, String defaultValue) { + LogMiningDmlParser mode = parse(value); + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + return mode; + } + } + /** * A {@link TableFilter} that excludes all Oracle system tables. * @@ -853,6 +900,13 @@ public Duration getLogMiningTransactionRetention() { return Duration.ofHours(getConfig().getInteger(LOG_MINING_TRANSACTION_RETENTION)); } + /** + * @return the log mining parser implementation to be used + */ + public LogMiningDmlParser getLogMiningDmlParser() { + return LogMiningDmlParser.parse(getConfig().getString(LOG_MINING_DML_PARSER)); + } + /** * Validate the time.precision.mode configuration. * diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java index a8a79d0b2..14bd239e0 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java @@ -10,8 +10,13 @@ import java.math.BigDecimal; import java.sql.SQLException; import java.sql.Types; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -50,6 +55,30 @@ public class OracleValueConverters extends JdbcValueConverters { private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)"); + private static final ZoneId GMT_ZONE_ID = ZoneId.of("GMT"); + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .optionalStart() + .appendPattern(".") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) + .optionalEnd() + .toFormatter(); + + private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER = new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .appendPattern("dd-MMM-yy hh.mm.ss") + .optionalStart() + .appendPattern(".") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false) + .optionalEnd() + .appendPattern(" a") + .toFormatter(); + + private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE); + private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE); + private final OracleConnection connection; public OracleValueConverters(OracleConnectorConfig config, OracleConnection connection) { @@ -99,6 +128,7 @@ private SchemaBuilder getNumericSchema(Column column) { // a negative scale means rounding, e.g. NUMBER(10, -2) would be rounded to hundreds if (scale <= 0) { + // todo: DBZ-3078 LogMiner checked scale=0 && column.length() = 1 and returned SchemaBuilder.bool(), review impact. int width = column.length() - scale; if (width < 3) { @@ -107,7 +137,7 @@ private SchemaBuilder getNumericSchema(Column column) { else if (width < 5) { return SchemaBuilder.int16(); } - // todo: DBZ-137 this was changed and caused issues with datatype tests, reverted for now. + // todo: DBZ-3078 This was reverted as of DBZ-137 but LogMiner streaming used this, review impact. // else if (width < 10 || (width == 10 && scale == 0)) { else if (width < 10) { return SchemaBuilder.int32(); @@ -121,6 +151,7 @@ else if (width < 19) { return super.schemaBuilder(column); } else { + // todo: DBZ-3078 LogMiner streaming assumed VariableScaleDecimal.builder(), review impact. return variableScaleSchema(column); } } @@ -151,6 +182,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case Types.NUMERIC: return getNumericConverter(column, fieldDefn); case Types.FLOAT: + // todo: DBZ-3078 LogMiner used getFloatConverter here, review impact. // todo: DBZ-137 is there a reason why floats need to be converted to doubles rather than var-scales? // return data -> convertDouble(column, fieldDefn, data); return data -> convertVariableScale(column, fieldDefn, data); @@ -170,6 +202,9 @@ private Object getFloatConverter(Column column, Field fieldDefn, Object data) { if (data instanceof BigDecimal) { return ((BigDecimal) data).floatValue(); } + if (data instanceof String) { + return Float.parseFloat((String) data); + } return convertVariableScale(column, fieldDefn, data); } @@ -178,6 +213,7 @@ private ValueConverter getNumericConverter(Column column, Field fieldDefn) { Integer scale = column.scale().get(); if (scale <= 0) { + // todo DBZ-3078 LogMiner used scale == 0 && column.length() == 1 to return convertBoolean, review impact. int width = column.length() - scale; if (width < 3) { @@ -186,6 +222,7 @@ private ValueConverter getNumericConverter(Column column, Field fieldDefn) { else if (width < 5) { return data -> convertNumericAsSmallInt(column, fieldDefn, data); } + // todo: DBZ-3078 this was a thing for LogMiner streaming, review impact. // todo: DBZ-137 this was changed and caused issues with datatype tests, reverted for now. // else if (width < 10 || (width == 10 && scale == 0)) { else if (width < 10) { @@ -283,12 +320,20 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) { } } + if (data instanceof String) { + // In the case when the value is of String, convert it to a BigDecimal so that we can then + // aptly apply the scale adjustment below. + data = toBigDecimal(column, fieldDefn, data); + } + // adjust scale to column's scale if the column's scale is larger than the one from // the value (e.g. 4.4444 -> 4.444400) if (data instanceof BigDecimal) { data = withScaleAdjustedIfNeeded(column, (BigDecimal) data); } + // todo: DBZ-3078 can this now be removed? + // Perhaps this was added to support the double converter invocation? if (data instanceof Struct) { SpecialValueDecimal value = VariableScaleDecimal.toLogical((Struct) data); return value.getDecimalValue().orElse(null); @@ -365,10 +410,12 @@ protected Object convertNumericAsBigInteger(Column column, Field fieldDefn, Obje */ @Override protected Object convertBoolean(Column column, Field fieldDefn, Object data) { - if (data instanceof BigDecimal) { return ((BigDecimal) data).byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE; } + if (data instanceof String) { + return Byte.parseByte((String) data) == 0 ? Boolean.FALSE : Boolean.TRUE; + } return super.convertBoolean(column, fieldDefn, data); } @@ -442,19 +489,67 @@ protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, O if (data instanceof Long) { return data; } + if (data instanceof String) { + return resolveTimestampString(column, fieldDefn, (String) data); + } return super.convertTimestampToEpochMicros(column, fieldDefn, fromOracleTimeClasses(column, data)); } @Override protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object data) { + if (data instanceof String) { + return resolveTimestampString(column, fieldDefn, (String) data); + } return super.convertTimestampToEpochMillis(column, fieldDefn, fromOracleTimeClasses(column, data)); } @Override protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object data) { + if (data instanceof String) { + return resolveTimestampString(column, fieldDefn, (String) data); + } return super.convertTimestampToEpochNanos(column, fieldDefn, fromOracleTimeClasses(column, data)); } + private Object resolveTimestampString(Column column, Field fieldDefn, String data) { + LocalDateTime dateTime; + + final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(data); + if (toTimestampMatcher.matches()) { + String dateText = toTimestampMatcher.group(1); + if (dateText.indexOf(" AM") > 0 || dateText.indexOf(" PM") > 0) { + dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(dateText.trim())); + } + else { + dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(dateText.trim())); + } + Object value = getDateTimeWithPrecision(column, dateTime); + return value; + } + + final Matcher toDateMatcher = TO_DATE.matcher(data); + if (toDateMatcher.matches()) { + dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(toDateMatcher.group(1))); + return getDateTimeWithPrecision(column, dateTime); + } + + // Unable to resolve + return null; + } + + private Object getDateTimeWithPrecision(Column column, LocalDateTime dateTime) { + if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) { + if (getTimePrecision(column) <= 3) { + return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli(); + } + if (getTimePrecision(column) <= 6) { + return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli() * 1_000; + } + return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli() * 1_000_000; + } + return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli(); + } + @Override protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) { return super.convertTimestampWithZone(column, fieldDefn, fromOracleTimeClasses(column, data)); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/FastDmlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerDmlParser.java similarity index 97% rename from debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/FastDmlParser.java rename to debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerDmlParser.java index 04b861a53..0431ed244 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/FastDmlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerDmlParser.java @@ -19,7 +19,7 @@ /** * @author Chris Cranford */ -public class FastDmlParser { +public class LogMinerDmlParser { private static final String SINGLE_QUOTE = "'"; private static final String NULL = "NULL"; @@ -39,14 +39,15 @@ public class FastDmlParser { * @return the parsed DML entry record or {@code null} if the SQL was not parsed */ public LogMinerDmlEntry parse(String sql) { - if (sql.startsWith(INSERT_INTO)) { - return parseInsert(sql); - } - else if (sql.startsWith(UPDATE)) { - return parseUpdate(sql); - } - else if (sql.startsWith(DELETE)) { - return parseDelete(sql); + if (sql != null && sql.length() > 0) { + switch (sql.charAt(0)) { + case 'i': + return parseInsert(sql); + case 'u': + return parseUpdate(sql); + case 'd': + return parseDelete(sql); + } } return null; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java index 30ec3a50f..5f6f64244 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java @@ -246,15 +246,18 @@ static Duration getTimeDifference(OracleConnection connection) throws SQLExcepti * @param endScn the SCN to mine to * @param strategy this is about dictionary location * @param isContinuousMining works < 19 version only + * @param metrics log miner metrics * @throws SQLException if anything unexpected happens */ static void startLogMining(OracleConnection connection, Long startScn, Long endScn, - OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining) + OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining, LogMinerMetrics metrics) throws SQLException { LOGGER.trace("Starting log mining startScn={}, endScn={}, strategy={}, continuous={}", startScn, endScn, strategy, isContinuousMining); String statement = SqlUtils.startLogMinerStatement(startScn, endScn, strategy, isContinuousMining); try { + Instant start = Instant.now(); executeCallableStatement(connection, statement); + metrics.addCurrentMiningSessionStart(Duration.between(start, Instant.now())); } catch (SQLException e) { // Capture database state before throwing exception diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java index f2d93430d..8e82e150d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetrics.java @@ -42,6 +42,10 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean { private final AtomicReference totalBatchProcessingDuration = new AtomicReference<>(); private final AtomicReference lastBatchProcessingDuration = new AtomicReference<>(); private final AtomicReference maxBatchProcessingDuration = new AtomicReference<>(); + private final AtomicReference totalParseTime = new AtomicReference<>(); + private final AtomicReference totalStartLogMiningSession = new AtomicReference<>(); + private final AtomicReference totalProcessingTime = new AtomicReference<>(); + private final AtomicReference totalResultSetNextTime = new AtomicReference<>(); private final AtomicLong maxBatchProcessingThroughput = new AtomicLong(); private final AtomicReference currentLogFileName; private final AtomicReference redoLogStatus; @@ -105,6 +109,10 @@ public void reset() { maxBatchProcessingThroughput.set(0); lastBatchProcessingDuration.set(Duration.ZERO); networkConnectionProblemsCounter.set(0); + totalParseTime.set(Duration.ZERO); + totalStartLogMiningSession.set(Duration.ZERO); + totalProcessingTime.set(Duration.ZERO); + totalResultSetNextTime.set(Duration.ZERO); } // setters @@ -259,6 +267,37 @@ public long getNetworkConnectionProblemsCounter() { return networkConnectionProblemsCounter.get(); } + @Override + public long getTotalParseTime() { + return totalParseTime.get().toMillis(); + } + + public void addCurrentParseTime(Duration currentParseTime) { + totalParseTime.accumulateAndGet(currentParseTime, Duration::plus); + } + + @Override + public long getTotalMiningSessionStartTime() { + return totalStartLogMiningSession.get().toMillis(); + } + + public void addCurrentMiningSessionStart(Duration currentStartLogMiningSession) { + totalStartLogMiningSession.accumulateAndGet(currentStartLogMiningSession, Duration::plus); + } + + @Override + public long getTotalProcessingTime() { + return totalProcessingTime.get().toMillis(); + } + + public void addCurrentProcessingTime(Duration processingTime) { + totalProcessingTime.accumulateAndGet(processingTime, Duration::plus); + } + + public void addCurrentResultSetNext(Duration currentNextTime) { + totalResultSetNextTime.accumulateAndGet(currentNextTime, Duration::plus); + } + // MBean accessible setters @Override public void setBatchSize(int size) { @@ -340,6 +379,10 @@ public String toString() { ", sleepTimeMin=" + sleepTimeMin + ", sleepTimeMax=" + sleepTimeMax + ", sleepTimeIncrement=" + sleepTimeIncrement + + ", totalParseTime=" + totalParseTime + + ", totalStartLogMiningSession=" + totalStartLogMiningSession + + ", totalProcessTime=" + totalProcessingTime + + ", totalResultSetNextTime=" + totalResultSetNextTime + '}'; } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java index 44a3d7e44..0da3b887d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerMetricsMXBean.java @@ -147,6 +147,21 @@ public interface LogMinerMetricsMXBean { */ long getNetworkConnectionProblemsCounter(); + /** + * @return the total number of milliseconds used to parse DDL/DML statements + */ + long getTotalParseTime(); + + /** + * @return the total number of milliseconds spent starting a log mining session + */ + long getTotalMiningSessionStartTime(); + + /** + * @return the total number of milliseconds spent mining and processing results + */ + long getTotalProcessingTime(); + /** * Resets metrics. */ diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 3d2b540fc..bb8267bce 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -14,13 +14,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningDmlParser; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.jsqlparser.SimpleDmlParser; import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry; import io.debezium.data.Envelope; import io.debezium.pipeline.EventDispatcher; -import io.debezium.pipeline.source.spi.ChangeEventSource; +import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.util.Clock; @@ -35,10 +38,11 @@ */ class LogMinerQueryResultProcessor { - private final ChangeEventSource.ChangeEventSourceContext context; + private final ChangeEventSourceContext context; private final LogMinerMetrics metrics; private final TransactionalBuffer transactionalBuffer; - private final SimpleDmlParser dmlParser; + private final SimpleDmlParser legacyDmlParser; + private final LogMinerDmlParser dmlParser; private final OracleOffsetContext offsetContext; private final OracleDatabaseSchema schema; private final EventDispatcher dispatcher; @@ -51,15 +55,15 @@ class LogMinerQueryResultProcessor { private long stuckScnCounter = 0; private HistoryRecorder historyRecorder; - LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics, - TransactionalBuffer transactionalBuffer, SimpleDmlParser dmlParser, + LogMinerQueryResultProcessor(ChangeEventSourceContext context, OracleConnection jdbcConnection, + OracleConnectorConfig connectorConfig, LogMinerMetrics metrics, + TransactionalBuffer transactionalBuffer, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, EventDispatcher dispatcher, String catalogName, Clock clock, HistoryRecorder historyRecorder) { this.context = context; this.metrics = metrics; this.transactionalBuffer = transactionalBuffer; - this.dmlParser = dmlParser; this.offsetContext = offsetContext; this.schema = schema; this.dispatcher = dispatcher; @@ -67,6 +71,16 @@ class LogMinerQueryResultProcessor { this.catalogName = catalogName; this.clock = clock; this.historyRecorder = historyRecorder; + + if (connectorConfig.getLogMiningDmlParser().equals(LogMiningDmlParser.LEGACY)) { + OracleChangeRecordValueConverter converter = new OracleChangeRecordValueConverter(connectorConfig, jdbcConnection); + this.legacyDmlParser = new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converter); + this.dmlParser = null; + } + else { + this.legacyDmlParser = null; + this.dmlParser = new LogMinerDmlParser(); + } } /** @@ -78,12 +92,16 @@ int processResult(ResultSet resultSet) { int dmlCounter = 0, insertCounter = 0, updateCounter = 0, deleteCounter = 0; int commitCounter = 0; int rollbackCounter = 0; + int rows = 0; Instant startTime = Instant.now(); - while (true) { + while (context.isRunning()) { try { + Instant rsNextStart = Instant.now(); if (!resultSet.next()) { break; } + rows++; + metrics.addCurrentResultSetNext(Duration.between(rsNextStart, Instant.now())); } catch (SQLException e) { LogMinerHelper.logError(transactionalBufferMetrics, "Closed resultSet"); @@ -169,7 +187,10 @@ int processResult(ResultSet resultSet) { deleteCounter++; break; } - LogMinerDmlEntry dmlEntry = dmlParser.parse(redoSql, schema.getTables(), txId); + + Instant parseStart = Instant.now(); + LogMinerDmlEntry dmlEntry = parse(redoSql, txId); + metrics.addCurrentParseTime(Duration.between(parseStart, Instant.now())); if (dmlEntry == null || redoSql == null) { LOGGER.trace("Following statement was not parsed: {}, details: {}", redoSql, logMessage); @@ -230,9 +251,9 @@ int processResult(ResultSet resultSet) { if (offsetContext.getCommitScn() != null) { currentOffsetCommitScn = offsetContext.getCommitScn(); } - LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks, {} Inserts, {} Updates, {} Deletes. Processed in {} millis. " + + LOGGER.debug("{} Rows, {} DMLs, {} Commits, {} Rollbacks, {} Inserts, {} Updates, {} Deletes. Processed in {} millis. " + "Lag:{}. Offset scn:{}. Offset commit scn:{}. Active transactions:{}. Sleep time:{}", - dmlCounter, commitCounter, rollbackCounter, insertCounter, updateCounter, deleteCounter, totalTime.toMillis(), + rows, dmlCounter, commitCounter, rollbackCounter, insertCounter, updateCounter, deleteCounter, totalTime.toMillis(), transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn(), offsetContext.getCommitScn(), transactionalBufferMetrics.getNumberOfActiveTransactions(), metrics.getMillisecondToSleepBetweenMiningQuery()); } @@ -263,4 +284,11 @@ private void warnStuckScn() { } } } + + private LogMinerDmlEntry parse(String redoSql, String txId) { + if (this.legacyDmlParser != null) { + return legacyDmlParser.parse(redoSql, schema.getTables(), txId); + } + return dmlParser.parse(redoSql); + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index ece18c3c2..f2216af80 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -25,6 +25,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; +import java.time.Instant; import java.util.HashSet; import java.util.Map; import java.util.Optional; @@ -40,7 +41,6 @@ import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OracleTaskContext; -import io.debezium.connector.oracle.jsqlparser.SimpleDmlParser; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; @@ -58,6 +58,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerStreamingChangeEventSource.class); + // TODO: change this to use MAX_QUEUE_SIE as the default private static final int LOG_MINING_VIEW_FETCH_SIZE = 10_000; private final OracleConnection jdbcConnection; @@ -65,7 +66,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS private final Clock clock; private final OracleDatabaseSchema schema; private final OracleOffsetContext offsetContext; - private final SimpleDmlParser dmlParser; private final String catalogName; private final boolean isRac; private final Set racHosts = new HashSet<>(); @@ -90,10 +90,8 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, this.clock = clock; this.schema = schema; this.offsetContext = offsetContext; - OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(connectorConfig, jdbcConnection); this.connectorConfig = connectorConfig; this.catalogName = (connectorConfig.getPdbName() != null) ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName(); - this.dmlParser = new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converters); this.strategy = connectorConfig.getLogMiningStrategy(); this.isContinuousMining = connectorConfig.isContinuousMining(); this.errorHandler = errorHandler; @@ -138,12 +136,14 @@ public void execute(ChangeEventSourceContext context) { initializeRedoLogsForMining(jdbcConnection, false, archiveLogRetention); HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder(); + Instant start = Instant.now(); try { // todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration? historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours()); - final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics, - transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, catalogName, clock, historyRecorder); + final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, jdbcConnection, + connectorConfig, logMinerMetrics, transactionalBuffer, offsetContext, schema, dispatcher, + catalogName, clock, historyRecorder); try (PreparedStatement miningView = jdbcConnection.connection() .prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) { @@ -173,7 +173,7 @@ public void execute(ChangeEventSourceContext context) { currentRedoLogFiles = getCurrentRedoLogFiles(jdbcConnection, logMinerMetrics); } - startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining); + startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining, logMinerMetrics); stopwatch.start(); miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE); @@ -196,6 +196,7 @@ public void execute(ChangeEventSourceContext context) { } finally { historyRecorder.close(); + logMinerMetrics.addCurrentProcessingTime(Duration.between(start, Instant.now())); } } catch (Throwable t) { diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java index d6042fa1c..a984f8584 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/FastDmlParserTest.java @@ -45,7 +45,7 @@ public class FastDmlParserTest { private final List iterations = Arrays.asList(1000, 5000, 10000, 20000, 50000, 100000, 500000, 1000000); private SimpleDmlParser simpleDmlParser; - private FastDmlParser fastDmlParser; + private LogMinerDmlParser fastDmlParser; @Before public void beforeEach() throws Exception { @@ -55,8 +55,8 @@ public void beforeEach() throws Exception { OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(connectorConfig, jdbcConnection); simpleDmlParser = new SimpleDmlParser(CATALOG_NAME, SCHEMA_NAME, converters); - // Create FastDmlParser - fastDmlParser = new FastDmlParser(); + // Create LogMinerDmlParser + fastDmlParser = new LogMinerDmlParser(); } // Oracle's generated SQL avoids common spacing patterns such as spaces between column values or values