DBZ-3078 Connector adjustments for new DML parser

This commit is contained in:
Chris Cranford 2021-02-11 12:57:16 -05:00 committed by Gunnar Morling
parent 92b9558218
commit 9c2edc7e61
9 changed files with 274 additions and 34 deletions

View File

@ -202,6 +202,15 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withDescription("Complete JDBC URL as an alternative to specifying hostname, port and database provided "
+ "as a way to support alternative connection scenarios.");
public static final Field LOG_MINING_DML_PARSER = Field.create("log.mining.dml.parser")
.withDisplayName("Log Mining DML parser implementation")
.withEnum(LogMiningDmlParser.class, LogMiningDmlParser.FAST)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The parser implementation to use when parsing DML operations:" +
"'legacy': the legacy parser implementation based on JSqlParser; " +
"'fast': the robust parser implementation that is streamlined specifically for LogMiner redo format");
public static final Field LOG_MINING_ARCHIVE_LOG_HOURS = Field.create("log.mining.archive.log.hours")
.withDisplayName("Log Mining Archive Log Hours")
.withType(Type.LONG)
@ -326,7 +335,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_SLEEP_TIME_DEFAULT_MS,
LOG_MINING_SLEEP_TIME_MIN_MS,
LOG_MINING_SLEEP_TIME_MAX_MS,
LOG_MINING_SLEEP_TIME_INCREMENT_MS);
LOG_MINING_SLEEP_TIME_INCREMENT_MS,
LOG_MINING_DML_PARSER);
private final String databaseName;
private final String pdbName;
@ -402,7 +412,7 @@ public static ConfigDef configDef() {
SNAPSHOT_ENHANCEMENT_TOKEN, LOG_MINING_HISTORY_RECORDER_CLASS, LOG_MINING_HISTORY_RETENTION, RAC_SYSTEM, RAC_NODES,
LOG_MINING_ARCHIVE_LOG_HOURS, LOG_MINING_BATCH_SIZE_DEFAULT, LOG_MINING_BATCH_SIZE_MIN, LOG_MINING_BATCH_SIZE_MAX,
LOG_MINING_SLEEP_TIME_DEFAULT_MS, LOG_MINING_SLEEP_TIME_MIN_MS, LOG_MINING_SLEEP_TIME_MAX_MS, LOG_MINING_SLEEP_TIME_INCREMENT_MS,
LOG_MINING_TRANSACTION_RETENTION);
LOG_MINING_TRANSACTION_RETENTION, LOG_MINING_DML_PARSER);
return config;
}
@ -671,6 +681,43 @@ public static LogMiningStrategy parse(String value, String defaultValue) {
}
}
public enum LogMiningDmlParser implements EnumeratedValue {
LEGACY("legacy"),
FAST("fast");
private final String value;
LogMiningDmlParser(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
public static LogMiningDmlParser parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (LogMiningDmlParser parser : LogMiningDmlParser.values()) {
if (parser.getValue().equalsIgnoreCase(value)) {
return parser;
}
}
return null;
}
public static LogMiningDmlParser parse(String value, String defaultValue) {
LogMiningDmlParser mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
/**
* A {@link TableFilter} that excludes all Oracle system tables.
*
@ -853,6 +900,13 @@ public Duration getLogMiningTransactionRetention() {
return Duration.ofHours(getConfig().getInteger(LOG_MINING_TRANSACTION_RETENTION));
}
/**
* @return the log mining parser implementation to be used
*/
public LogMiningDmlParser getLogMiningDmlParser() {
return LogMiningDmlParser.parse(getConfig().getString(LOG_MINING_DML_PARSER));
}
/**
* Validate the time.precision.mode configuration.
*

View File

@ -10,8 +10,13 @@
import java.math.BigDecimal;
import java.sql.SQLException;
import java.sql.Types;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -50,6 +55,30 @@ public class OracleValueConverters extends JdbcValueConverters {
private static final Pattern INTERVAL_DAY_SECOND_PATTERN = Pattern.compile("([+\\-])?(\\d+) (\\d+):(\\d+):(\\d+).(\\d+)");
private static final ZoneId GMT_ZONE_ID = ZoneId.of("GMT");
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.optionalStart()
.appendPattern(".")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
.optionalEnd()
.toFormatter();
private static final DateTimeFormatter TIMESTAMP_AM_PM_SHORT_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("dd-MMM-yy hh.mm.ss")
.optionalStart()
.appendPattern(".")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, false)
.optionalEnd()
.appendPattern(" a")
.toFormatter();
private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
private final OracleConnection connection;
public OracleValueConverters(OracleConnectorConfig config, OracleConnection connection) {
@ -99,6 +128,7 @@ private SchemaBuilder getNumericSchema(Column column) {
// a negative scale means rounding, e.g. NUMBER(10, -2) would be rounded to hundreds
if (scale <= 0) {
// todo: DBZ-3078 LogMiner checked scale=0 && column.length() = 1 and returned SchemaBuilder.bool(), review impact.
int width = column.length() - scale;
if (width < 3) {
@ -107,7 +137,7 @@ private SchemaBuilder getNumericSchema(Column column) {
else if (width < 5) {
return SchemaBuilder.int16();
}
// todo: DBZ-137 this was changed and caused issues with datatype tests, reverted for now.
// todo: DBZ-3078 This was reverted as of DBZ-137 but LogMiner streaming used this, review impact.
// else if (width < 10 || (width == 10 && scale == 0)) {
else if (width < 10) {
return SchemaBuilder.int32();
@ -121,6 +151,7 @@ else if (width < 19) {
return super.schemaBuilder(column);
}
else {
// todo: DBZ-3078 LogMiner streaming assumed VariableScaleDecimal.builder(), review impact.
return variableScaleSchema(column);
}
}
@ -151,6 +182,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case Types.NUMERIC:
return getNumericConverter(column, fieldDefn);
case Types.FLOAT:
// todo: DBZ-3078 LogMiner used getFloatConverter here, review impact.
// todo: DBZ-137 is there a reason why floats need to be converted to doubles rather than var-scales?
// return data -> convertDouble(column, fieldDefn, data);
return data -> convertVariableScale(column, fieldDefn, data);
@ -170,6 +202,9 @@ private Object getFloatConverter(Column column, Field fieldDefn, Object data) {
if (data instanceof BigDecimal) {
return ((BigDecimal) data).floatValue();
}
if (data instanceof String) {
return Float.parseFloat((String) data);
}
return convertVariableScale(column, fieldDefn, data);
}
@ -178,6 +213,7 @@ private ValueConverter getNumericConverter(Column column, Field fieldDefn) {
Integer scale = column.scale().get();
if (scale <= 0) {
// todo DBZ-3078 LogMiner used scale == 0 && column.length() == 1 to return convertBoolean, review impact.
int width = column.length() - scale;
if (width < 3) {
@ -186,6 +222,7 @@ private ValueConverter getNumericConverter(Column column, Field fieldDefn) {
else if (width < 5) {
return data -> convertNumericAsSmallInt(column, fieldDefn, data);
}
// todo: DBZ-3078 this was a thing for LogMiner streaming, review impact.
// todo: DBZ-137 this was changed and caused issues with datatype tests, reverted for now.
// else if (width < 10 || (width == 10 && scale == 0)) {
else if (width < 10) {
@ -283,12 +320,20 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) {
}
}
if (data instanceof String) {
// In the case when the value is of String, convert it to a BigDecimal so that we can then
// aptly apply the scale adjustment below.
data = toBigDecimal(column, fieldDefn, data);
}
// adjust scale to column's scale if the column's scale is larger than the one from
// the value (e.g. 4.4444 -> 4.444400)
if (data instanceof BigDecimal) {
data = withScaleAdjustedIfNeeded(column, (BigDecimal) data);
}
// todo: DBZ-3078 can this now be removed?
// Perhaps this was added to support the double converter invocation?
if (data instanceof Struct) {
SpecialValueDecimal value = VariableScaleDecimal.toLogical((Struct) data);
return value.getDecimalValue().orElse(null);
@ -365,10 +410,12 @@ protected Object convertNumericAsBigInteger(Column column, Field fieldDefn, Obje
*/
@Override
protected Object convertBoolean(Column column, Field fieldDefn, Object data) {
if (data instanceof BigDecimal) {
return ((BigDecimal) data).byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
}
if (data instanceof String) {
return Byte.parseByte((String) data) == 0 ? Boolean.FALSE : Boolean.TRUE;
}
return super.convertBoolean(column, fieldDefn, data);
}
@ -442,19 +489,67 @@ protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, O
if (data instanceof Long) {
return data;
}
if (data instanceof String) {
return resolveTimestampString(column, fieldDefn, (String) data);
}
return super.convertTimestampToEpochMicros(column, fieldDefn, fromOracleTimeClasses(column, data));
}
@Override
protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object data) {
if (data instanceof String) {
return resolveTimestampString(column, fieldDefn, (String) data);
}
return super.convertTimestampToEpochMillis(column, fieldDefn, fromOracleTimeClasses(column, data));
}
@Override
protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object data) {
if (data instanceof String) {
return resolveTimestampString(column, fieldDefn, (String) data);
}
return super.convertTimestampToEpochNanos(column, fieldDefn, fromOracleTimeClasses(column, data));
}
private Object resolveTimestampString(Column column, Field fieldDefn, String data) {
LocalDateTime dateTime;
final Matcher toTimestampMatcher = TO_TIMESTAMP.matcher(data);
if (toTimestampMatcher.matches()) {
String dateText = toTimestampMatcher.group(1);
if (dateText.indexOf(" AM") > 0 || dateText.indexOf(" PM") > 0) {
dateTime = LocalDateTime.from(TIMESTAMP_AM_PM_SHORT_FORMATTER.parse(dateText.trim()));
}
else {
dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(dateText.trim()));
}
Object value = getDateTimeWithPrecision(column, dateTime);
return value;
}
final Matcher toDateMatcher = TO_DATE.matcher(data);
if (toDateMatcher.matches()) {
dateTime = LocalDateTime.from(TIMESTAMP_FORMATTER.parse(toDateMatcher.group(1)));
return getDateTimeWithPrecision(column, dateTime);
}
// Unable to resolve
return null;
}
private Object getDateTimeWithPrecision(Column column, LocalDateTime dateTime) {
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
if (getTimePrecision(column) <= 3) {
return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli();
}
if (getTimePrecision(column) <= 6) {
return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli() * 1_000;
}
return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli() * 1_000_000;
}
return dateTime.atZone(GMT_ZONE_ID).toInstant().toEpochMilli();
}
@Override
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
return super.convertTimestampWithZone(column, fieldDefn, fromOracleTimeClasses(column, data));

View File

@ -19,7 +19,7 @@
/**
* @author Chris Cranford
*/
public class FastDmlParser {
public class LogMinerDmlParser {
private static final String SINGLE_QUOTE = "'";
private static final String NULL = "NULL";
@ -39,14 +39,15 @@ public class FastDmlParser {
* @return the parsed DML entry record or {@code null} if the SQL was not parsed
*/
public LogMinerDmlEntry parse(String sql) {
if (sql.startsWith(INSERT_INTO)) {
return parseInsert(sql);
}
else if (sql.startsWith(UPDATE)) {
return parseUpdate(sql);
}
else if (sql.startsWith(DELETE)) {
return parseDelete(sql);
if (sql != null && sql.length() > 0) {
switch (sql.charAt(0)) {
case 'i':
return parseInsert(sql);
case 'u':
return parseUpdate(sql);
case 'd':
return parseDelete(sql);
}
}
return null;
}

View File

@ -246,15 +246,18 @@ static Duration getTimeDifference(OracleConnection connection) throws SQLExcepti
* @param endScn the SCN to mine to
* @param strategy this is about dictionary location
* @param isContinuousMining works < 19 version only
* @param metrics log miner metrics
* @throws SQLException if anything unexpected happens
*/
static void startLogMining(OracleConnection connection, Long startScn, Long endScn,
OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining)
OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining, LogMinerMetrics metrics)
throws SQLException {
LOGGER.trace("Starting log mining startScn={}, endScn={}, strategy={}, continuous={}", startScn, endScn, strategy, isContinuousMining);
String statement = SqlUtils.startLogMinerStatement(startScn, endScn, strategy, isContinuousMining);
try {
Instant start = Instant.now();
executeCallableStatement(connection, statement);
metrics.addCurrentMiningSessionStart(Duration.between(start, Instant.now()));
}
catch (SQLException e) {
// Capture database state before throwing exception

View File

@ -42,6 +42,10 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private final AtomicReference<Duration> totalBatchProcessingDuration = new AtomicReference<>();
private final AtomicReference<Duration> lastBatchProcessingDuration = new AtomicReference<>();
private final AtomicReference<Duration> maxBatchProcessingDuration = new AtomicReference<>();
private final AtomicReference<Duration> totalParseTime = new AtomicReference<>();
private final AtomicReference<Duration> totalStartLogMiningSession = new AtomicReference<>();
private final AtomicReference<Duration> totalProcessingTime = new AtomicReference<>();
private final AtomicReference<Duration> totalResultSetNextTime = new AtomicReference<>();
private final AtomicLong maxBatchProcessingThroughput = new AtomicLong();
private final AtomicReference<String[]> currentLogFileName;
private final AtomicReference<String[]> redoLogStatus;
@ -105,6 +109,10 @@ public void reset() {
maxBatchProcessingThroughput.set(0);
lastBatchProcessingDuration.set(Duration.ZERO);
networkConnectionProblemsCounter.set(0);
totalParseTime.set(Duration.ZERO);
totalStartLogMiningSession.set(Duration.ZERO);
totalProcessingTime.set(Duration.ZERO);
totalResultSetNextTime.set(Duration.ZERO);
}
// setters
@ -259,6 +267,37 @@ public long getNetworkConnectionProblemsCounter() {
return networkConnectionProblemsCounter.get();
}
@Override
public long getTotalParseTime() {
return totalParseTime.get().toMillis();
}
public void addCurrentParseTime(Duration currentParseTime) {
totalParseTime.accumulateAndGet(currentParseTime, Duration::plus);
}
@Override
public long getTotalMiningSessionStartTime() {
return totalStartLogMiningSession.get().toMillis();
}
public void addCurrentMiningSessionStart(Duration currentStartLogMiningSession) {
totalStartLogMiningSession.accumulateAndGet(currentStartLogMiningSession, Duration::plus);
}
@Override
public long getTotalProcessingTime() {
return totalProcessingTime.get().toMillis();
}
public void addCurrentProcessingTime(Duration processingTime) {
totalProcessingTime.accumulateAndGet(processingTime, Duration::plus);
}
public void addCurrentResultSetNext(Duration currentNextTime) {
totalResultSetNextTime.accumulateAndGet(currentNextTime, Duration::plus);
}
// MBean accessible setters
@Override
public void setBatchSize(int size) {
@ -340,6 +379,10 @@ public String toString() {
", sleepTimeMin=" + sleepTimeMin +
", sleepTimeMax=" + sleepTimeMax +
", sleepTimeIncrement=" + sleepTimeIncrement +
", totalParseTime=" + totalParseTime +
", totalStartLogMiningSession=" + totalStartLogMiningSession +
", totalProcessTime=" + totalProcessingTime +
", totalResultSetNextTime=" + totalResultSetNextTime +
'}';
}
}

View File

@ -147,6 +147,21 @@ public interface LogMinerMetricsMXBean {
*/
long getNetworkConnectionProblemsCounter();
/**
* @return the total number of milliseconds used to parse DDL/DML statements
*/
long getTotalParseTime();
/**
* @return the total number of milliseconds spent starting a log mining session
*/
long getTotalMiningSessionStartTime();
/**
* @return the total number of milliseconds spent mining and processing results
*/
long getTotalProcessingTime();
/**
* Resets metrics.
*/

View File

@ -14,13 +14,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningDmlParser;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.jsqlparser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
@ -35,10 +38,11 @@
*/
class LogMinerQueryResultProcessor {
private final ChangeEventSource.ChangeEventSourceContext context;
private final ChangeEventSourceContext context;
private final LogMinerMetrics metrics;
private final TransactionalBuffer transactionalBuffer;
private final SimpleDmlParser dmlParser;
private final SimpleDmlParser legacyDmlParser;
private final LogMinerDmlParser dmlParser;
private final OracleOffsetContext offsetContext;
private final OracleDatabaseSchema schema;
private final EventDispatcher<TableId> dispatcher;
@ -51,15 +55,15 @@ class LogMinerQueryResultProcessor {
private long stuckScnCounter = 0;
private HistoryRecorder historyRecorder;
LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics,
TransactionalBuffer transactionalBuffer, SimpleDmlParser dmlParser,
LogMinerQueryResultProcessor(ChangeEventSourceContext context, OracleConnection jdbcConnection,
OracleConnectorConfig connectorConfig, LogMinerMetrics metrics,
TransactionalBuffer transactionalBuffer,
OracleOffsetContext offsetContext, OracleDatabaseSchema schema,
EventDispatcher<TableId> dispatcher,
String catalogName, Clock clock, HistoryRecorder historyRecorder) {
this.context = context;
this.metrics = metrics;
this.transactionalBuffer = transactionalBuffer;
this.dmlParser = dmlParser;
this.offsetContext = offsetContext;
this.schema = schema;
this.dispatcher = dispatcher;
@ -67,6 +71,16 @@ class LogMinerQueryResultProcessor {
this.catalogName = catalogName;
this.clock = clock;
this.historyRecorder = historyRecorder;
if (connectorConfig.getLogMiningDmlParser().equals(LogMiningDmlParser.LEGACY)) {
OracleChangeRecordValueConverter converter = new OracleChangeRecordValueConverter(connectorConfig, jdbcConnection);
this.legacyDmlParser = new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converter);
this.dmlParser = null;
}
else {
this.legacyDmlParser = null;
this.dmlParser = new LogMinerDmlParser();
}
}
/**
@ -78,12 +92,16 @@ int processResult(ResultSet resultSet) {
int dmlCounter = 0, insertCounter = 0, updateCounter = 0, deleteCounter = 0;
int commitCounter = 0;
int rollbackCounter = 0;
int rows = 0;
Instant startTime = Instant.now();
while (true) {
while (context.isRunning()) {
try {
Instant rsNextStart = Instant.now();
if (!resultSet.next()) {
break;
}
rows++;
metrics.addCurrentResultSetNext(Duration.between(rsNextStart, Instant.now()));
}
catch (SQLException e) {
LogMinerHelper.logError(transactionalBufferMetrics, "Closed resultSet");
@ -169,7 +187,10 @@ int processResult(ResultSet resultSet) {
deleteCounter++;
break;
}
LogMinerDmlEntry dmlEntry = dmlParser.parse(redoSql, schema.getTables(), txId);
Instant parseStart = Instant.now();
LogMinerDmlEntry dmlEntry = parse(redoSql, txId);
metrics.addCurrentParseTime(Duration.between(parseStart, Instant.now()));
if (dmlEntry == null || redoSql == null) {
LOGGER.trace("Following statement was not parsed: {}, details: {}", redoSql, logMessage);
@ -230,9 +251,9 @@ int processResult(ResultSet resultSet) {
if (offsetContext.getCommitScn() != null) {
currentOffsetCommitScn = offsetContext.getCommitScn();
}
LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks, {} Inserts, {} Updates, {} Deletes. Processed in {} millis. " +
LOGGER.debug("{} Rows, {} DMLs, {} Commits, {} Rollbacks, {} Inserts, {} Updates, {} Deletes. Processed in {} millis. " +
"Lag:{}. Offset scn:{}. Offset commit scn:{}. Active transactions:{}. Sleep time:{}",
dmlCounter, commitCounter, rollbackCounter, insertCounter, updateCounter, deleteCounter, totalTime.toMillis(),
rows, dmlCounter, commitCounter, rollbackCounter, insertCounter, updateCounter, deleteCounter, totalTime.toMillis(),
transactionalBufferMetrics.getLagFromSource(), offsetContext.getScn(), offsetContext.getCommitScn(),
transactionalBufferMetrics.getNumberOfActiveTransactions(), metrics.getMillisecondToSleepBetweenMiningQuery());
}
@ -263,4 +284,11 @@ private void warnStuckScn() {
}
}
}
private LogMinerDmlEntry parse(String redoSql, String txId) {
if (this.legacyDmlParser != null) {
return legacyDmlParser.parse(redoSql, schema.getTables(), txId);
}
return dmlParser.parse(redoSql);
}
}

View File

@ -25,6 +25,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
@ -40,7 +41,6 @@
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.jsqlparser.SimpleDmlParser;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
@ -58,6 +58,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerStreamingChangeEventSource.class);
// TODO: change this to use MAX_QUEUE_SIE as the default
private static final int LOG_MINING_VIEW_FETCH_SIZE = 10_000;
private final OracleConnection jdbcConnection;
@ -65,7 +66,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final Clock clock;
private final OracleDatabaseSchema schema;
private final OracleOffsetContext offsetContext;
private final SimpleDmlParser dmlParser;
private final String catalogName;
private final boolean isRac;
private final Set<String> racHosts = new HashSet<>();
@ -90,10 +90,8 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.clock = clock;
this.schema = schema;
this.offsetContext = offsetContext;
OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(connectorConfig, jdbcConnection);
this.connectorConfig = connectorConfig;
this.catalogName = (connectorConfig.getPdbName() != null) ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName();
this.dmlParser = new SimpleDmlParser(catalogName, connectorConfig.getSchemaName(), converters);
this.strategy = connectorConfig.getLogMiningStrategy();
this.isContinuousMining = connectorConfig.isContinuousMining();
this.errorHandler = errorHandler;
@ -138,12 +136,14 @@ public void execute(ChangeEventSourceContext context) {
initializeRedoLogsForMining(jdbcConnection, false, archiveLogRetention);
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
Instant start = Instant.now();
try {
// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());
final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics,
transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, catalogName, clock, historyRecorder);
final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, jdbcConnection,
connectorConfig, logMinerMetrics, transactionalBuffer, offsetContext, schema, dispatcher,
catalogName, clock, historyRecorder);
try (PreparedStatement miningView = jdbcConnection.connection()
.prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
@ -173,7 +173,7 @@ public void execute(ChangeEventSourceContext context) {
currentRedoLogFiles = getCurrentRedoLogFiles(jdbcConnection, logMinerMetrics);
}
startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining);
startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining, logMinerMetrics);
stopwatch.start();
miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE);
@ -196,6 +196,7 @@ public void execute(ChangeEventSourceContext context) {
}
finally {
historyRecorder.close();
logMinerMetrics.addCurrentProcessingTime(Duration.between(start, Instant.now()));
}
}
catch (Throwable t) {

View File

@ -45,7 +45,7 @@ public class FastDmlParserTest {
private final List<Integer> iterations = Arrays.asList(1000, 5000, 10000, 20000, 50000, 100000, 500000, 1000000);
private SimpleDmlParser simpleDmlParser;
private FastDmlParser fastDmlParser;
private LogMinerDmlParser fastDmlParser;
@Before
public void beforeEach() throws Exception {
@ -55,8 +55,8 @@ public void beforeEach() throws Exception {
OracleChangeRecordValueConverter converters = new OracleChangeRecordValueConverter(connectorConfig, jdbcConnection);
simpleDmlParser = new SimpleDmlParser(CATALOG_NAME, SCHEMA_NAME, converters);
// Create FastDmlParser
fastDmlParser = new FastDmlParser();
// Create LogMinerDmlParser
fastDmlParser = new LogMinerDmlParser();
}
// Oracle's generated SQL avoids common spacing patterns such as spaces between column values or values