DBZ-137 More suggested fixes

This commit is contained in:
Chris Cranford 2020-09-14 17:01:20 -04:00 committed by Gunnar Morling
parent 31a6e47e90
commit 72751a40c7
12 changed files with 59 additions and 165 deletions

View File

@ -106,7 +106,6 @@
<properties> <properties>
<adapter.name>xstream</adapter.name> <adapter.name>xstream</adapter.name>
<version.oracle.driver>12.2.0.1</version.oracle.driver>
</properties> </properties>
<build> <build>

View File

@ -55,8 +55,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage(); this.schema.initializeStorage();
String adapterString = config.getString("connection.adapter"); String adapterString = config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER);
adapterString = adapterString == null ? config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER) : adapterString;
OracleConnectorConfig.ConnectorAdapter adapter = OracleConnectorConfig.ConnectorAdapter.parse(adapterString); OracleConnectorConfig.ConnectorAdapter adapter = OracleConnectorConfig.ConnectorAdapter.parse(adapterString);
OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig, adapter)); OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig, adapter));

View File

@ -10,9 +10,6 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Savepoint; import java.sql.Savepoint;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -231,8 +228,6 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Relati
@Override @Override
protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotContext, String overriddenSelect, TableId tableId) { protected String enhanceOverriddenSelect(RelationalSnapshotContext snapshotContext, String overriddenSelect, TableId tableId) {
String columnString = buildSelectColumns(connectorConfig.getConfig().getString(connectorConfig.COLUMN_BLACKLIST), snapshotContext.tables.forTable(tableId));
overriddenSelect = overriddenSelect.replaceFirst("\\*", columnString);
long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn"); long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn");
String token = connectorConfig.getTokenToReplaceInSnapshotPredicate(); String token = connectorConfig.getTokenToReplaceInSnapshotPredicate();
if (token != null) { if (token != null) {
@ -268,43 +263,10 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapsh
@Override @Override
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) { protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) {
String columnString = buildSelectColumns(connectorConfig.getConfig().getString(connectorConfig.COLUMN_BLACKLIST), snapshotContext.tables.forTable(tableId));
long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn"); long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn");
return Optional.of("SELECT * FROM " + quote(tableId) + " AS OF SCN " + snapshotOffset); return Optional.of("SELECT * FROM " + quote(tableId) + " AS OF SCN " + snapshotOffset);
} }
/**
* This is to build "whitelisted" column list
* @param blackListColumnStr comma separated columns blacklist
* @param table the table
* @return column list for select
*/
public static String buildSelectColumns(String blackListColumnStr, Table table) {
String columnsToSelect = "*";
if (blackListColumnStr != null && blackListColumnStr.trim().length() > 0
&& blackListColumnStr.toUpperCase().contains(table.id().table())) {
String allTableColumns = table.retrieveColumnNames().stream()
.map(columnName -> {
StringBuilder sb = new StringBuilder();
if (!columnName.contains(table.id().table())) {
sb.append(table.id().table()).append(".").append(columnName);
}
else {
sb.append(columnName);
}
return sb.toString();
}).collect(Collectors.joining(","));
// todo this is an unnecessary code, fix unit test, then remove it
String catalog = table.id().catalog();
List<String> blackList = new ArrayList<>(Arrays.asList(blackListColumnStr.trim().toUpperCase().replaceAll(catalog + ".", "").split(",")));
List<String> allColumns = new ArrayList<>(Arrays.asList(allTableColumns.toUpperCase().split(",")));
allColumns.removeAll(blackList);
columnsToSelect = String.join(",", allColumns);
}
return columnsToSelect;
}
@Override @Override
protected void complete(SnapshotContext snapshotContext) { protected void complete(SnapshotContext snapshotContext) {
if (connectorConfig.getPdbName() != null) { if (connectorConfig.getPdbName() != null) {

View File

@ -101,9 +101,6 @@ private SchemaBuilder getNumericSchema(Column column) {
if (scale <= 0) { if (scale <= 0) {
int width = column.length() - scale; int width = column.length() - scale;
if (scale == 0 && column.length() == 1) {
return SchemaBuilder.bool();
}
if (width < 3) { if (width < 3) {
return SchemaBuilder.int8(); return SchemaBuilder.int8();
} }
@ -181,11 +178,6 @@ private ValueConverter getNumericConverter(Column column, Field fieldDefn) {
Integer scale = column.scale().get(); Integer scale = column.scale().get();
if (scale <= 0) { if (scale <= 0) {
// Boolean represtented as Number(1,0)
if (scale == 0 && column.length() == 1) {
return data -> convertBoolean(column, fieldDefn, data);
}
int width = column.length() - scale; int width = column.length() - scale;
if (width < 3) { if (width < 3) {

View File

@ -25,10 +25,10 @@
*/ */
public class OracleDmlParser extends AntlrDdlParser<PlSqlLexer, PlSqlParser> { public class OracleDmlParser extends AntlrDdlParser<PlSqlLexer, PlSqlParser> {
protected final String catalogName;
protected final String schemaName;
private final OracleChangeRecordValueConverter converter;
private LogMinerDmlEntry dmlEntry; private LogMinerDmlEntry dmlEntry;
protected String catalogName;
protected String schemaName;
private OracleChangeRecordValueConverter converter;
public OracleDmlParser(boolean throwErrorsFromTreeWalk, final String catalogName, final String schemaName, OracleChangeRecordValueConverter converter) { public OracleDmlParser(boolean throwErrorsFromTreeWalk, final String catalogName, final String schemaName, OracleChangeRecordValueConverter converter) {
super(throwErrorsFromTreeWalk); super(throwErrorsFromTreeWalk);

View File

@ -52,14 +52,14 @@
public class SimpleDmlParser { public class SimpleDmlParser {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleDmlParser.class); private static final Logger LOGGER = LoggerFactory.getLogger(SimpleDmlParser.class);
protected String catalogName; protected final String catalogName;
protected String schemaName; protected final String schemaName;
protected Table table;
private final OracleChangeRecordValueConverter converter; private final OracleChangeRecordValueConverter converter;
private final CCJSqlParserManager pm;
private final Map<String, LogMinerColumnValueWrapper> newColumnValues = new LinkedHashMap<>();
private final Map<String, LogMinerColumnValueWrapper> oldColumnValues = new LinkedHashMap<>();
protected Table table;
private String aliasName; private String aliasName;
private Map<String, LogMinerColumnValueWrapper> newColumnValues = new LinkedHashMap<>();
private Map<String, LogMinerColumnValueWrapper> oldColumnValues = new LinkedHashMap<>();
private CCJSqlParserManager pm;
/** /**
* Constructor * Constructor

View File

@ -125,16 +125,16 @@ static long getEndScn(Connection connection, long startScn, LogMinerMetrics metr
/** /**
* Calculate time difference between database and connector timers. It could be negative if DB time is ahead. * Calculate time difference between database and connector timers. It could be negative if DB time is ahead.
* @param connection connection * @param connection connection
* @return difference in milliseconds * @return the time difference as a {@link Duration}
*/ */
static long getTimeDifference(Connection connection) throws SQLException { static Duration getTimeDifference(Connection connection) throws SQLException {
Timestamp dbCurrentMillis = (Timestamp) getSingleResult(connection, SqlUtils.CURRENT_TIMESTAMP, DATATYPE.TIMESTAMP); Timestamp dbCurrentMillis = (Timestamp) getSingleResult(connection, SqlUtils.CURRENT_TIMESTAMP, DATATYPE.TIMESTAMP);
if (dbCurrentMillis == null) { if (dbCurrentMillis == null) {
return 0; return Duration.ZERO;
} }
Instant fromDb = dbCurrentMillis.toInstant(); Instant fromDb = dbCurrentMillis.toInstant();
Instant now = Instant.now(); Instant now = Instant.now();
return Duration.between(fromDb, now).toMillis(); return Duration.between(fromDb, now);
} }
/** /**

View File

@ -23,29 +23,30 @@
*/ */
@ThreadSafe @ThreadSafe
public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean { public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private AtomicLong currentScn = new AtomicLong();
private AtomicInteger capturedDmlCount = new AtomicInteger();
private AtomicReference<String[]> currentLogFileName;
private AtomicReference<String[]> redoLogStatus;
private AtomicInteger switchCounter = new AtomicInteger();
private AtomicReference<Duration> lastLogMinerQueryDuration = new AtomicReference<>();
private AtomicReference<Duration> averageLogMinerQueryDuration = new AtomicReference<>();
private AtomicInteger logMinerQueryCount = new AtomicInteger();
private AtomicReference<Duration> lastProcessedCapturedBatchDuration = new AtomicReference<>();
private AtomicInteger processedCapturedBatchCount = new AtomicInteger();
private AtomicReference<Duration> averageProcessedCapturedBatchDuration = new AtomicReference<>();
private AtomicInteger batchSize = new AtomicInteger();
private AtomicInteger millisecondToSleepBetweenMiningQuery = new AtomicInteger();
private final int MAX_SLEEP_TIME = 3_000; private final static int MAX_SLEEP_TIME = 3_000;
private final int DEFAULT_SLEEP_TIME = 1_000; private final static int DEFAULT_SLEEP_TIME = 1_000;
private final int MIN_SLEEP_TIME = 100; private final static int MIN_SLEEP_TIME = 100;
private final int MIN_BATCH_SIZE = 1_000; private final static int MIN_BATCH_SIZE = 1_000;
private final int MAX_BATCH_SIZE = 100_000; private final static int MAX_BATCH_SIZE = 100_000;
private final int DEFAULT_BATCH_SIZE = 5_000; private final static int DEFAULT_BATCH_SIZE = 5_000;
private final int SLEEP_TIME_INCREMENT = 200; private final static int SLEEP_TIME_INCREMENT = 200;
private final AtomicLong currentScn = new AtomicLong();
private final AtomicInteger capturedDmlCount = new AtomicInteger();
private final AtomicReference<String[]> currentLogFileName;
private final AtomicReference<String[]> redoLogStatus;
private final AtomicInteger switchCounter = new AtomicInteger();
private final AtomicReference<Duration> lastLogMinerQueryDuration = new AtomicReference<>();
private final AtomicReference<Duration> averageLogMinerQueryDuration = new AtomicReference<>();
private final AtomicInteger logMinerQueryCount = new AtomicInteger();
private final AtomicReference<Duration> lastProcessedCapturedBatchDuration = new AtomicReference<>();
private final AtomicInteger processedCapturedBatchCount = new AtomicInteger();
private final AtomicReference<Duration> averageProcessedCapturedBatchDuration = new AtomicReference<>();
private final AtomicInteger batchSize = new AtomicInteger();
private final AtomicInteger millisecondToSleepBetweenMiningQuery = new AtomicInteger();
LogMinerMetrics(CdcSourceTaskContext taskContext) { LogMinerMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner"); super(taskContext, "log-miner");

View File

@ -122,8 +122,8 @@ public void execute(ChangeEventSourceContext context) {
startScn = offsetContext.getScn(); startScn = offsetContext.getScn();
createAuditTable(connection); createAuditTable(connection);
LOGGER.trace("current millis {}, db time {}", System.currentTimeMillis(), getTimeDifference(connection)); LOGGER.trace("current millis {}, db time {}", System.currentTimeMillis(), getTimeDifference(connection).toMillis());
transactionalBufferMetrics.setTimeDifference(new AtomicLong(getTimeDifference(connection))); transactionalBufferMetrics.setTimeDifference(new AtomicLong(getTimeDifference(connection).toMillis()));
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection)) { if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection)) {
throw new RuntimeException("Online REDO LOG files don't contain the offset SCN. Clean offset and start over"); throw new RuntimeException("Online REDO LOG files don't contain the offset SCN. Clean offset and start over");

View File

@ -22,26 +22,26 @@
*/ */
@ThreadSafe @ThreadSafe
public class TransactionalBufferMetrics extends Metrics implements TransactionalBufferMetricsMXBean { public class TransactionalBufferMetrics extends Metrics implements TransactionalBufferMetricsMXBean {
private AtomicLong oldestScn = new AtomicLong(); private final AtomicLong oldestScn = new AtomicLong();
private AtomicLong committedScn = new AtomicLong(); private final AtomicLong committedScn = new AtomicLong();
private AtomicReference<Duration> lagFromTheSource = new AtomicReference<>(); private final AtomicReference<Duration> lagFromTheSource = new AtomicReference<>();
private AtomicInteger activeTransactions = new AtomicInteger(); private final AtomicInteger activeTransactions = new AtomicInteger();
private AtomicLong rolledBackTransactions = new AtomicLong(); private final AtomicLong rolledBackTransactions = new AtomicLong();
private AtomicLong committedTransactions = new AtomicLong(); private final AtomicLong committedTransactions = new AtomicLong();
private AtomicLong capturedDmlCounter = new AtomicLong(); private final AtomicLong capturedDmlCounter = new AtomicLong();
private AtomicLong committedDmlCounter = new AtomicLong(); private final AtomicLong committedDmlCounter = new AtomicLong();
private AtomicInteger commitQueueCapacity = new AtomicInteger(); private final AtomicInteger commitQueueCapacity = new AtomicInteger();
private AtomicReference<Duration> maxLagFromTheSource = new AtomicReference<>(); private final AtomicReference<Duration> maxLagFromTheSource = new AtomicReference<>();
private AtomicReference<Duration> minLagFromTheSource = new AtomicReference<>(); private final AtomicReference<Duration> minLagFromTheSource = new AtomicReference<>();
private AtomicReference<Duration> averageLagsFromTheSource = new AtomicReference<>(); private final AtomicReference<Duration> averageLagsFromTheSource = new AtomicReference<>();
private AtomicReference<Set<String>> abandonedTransactionIds = new AtomicReference<>(); private final AtomicReference<Set<String>> abandonedTransactionIds = new AtomicReference<>();
private AtomicReference<Set<String>> rolledBackTransactionIds = new AtomicReference<>(); private final AtomicReference<Set<String>> rolledBackTransactionIds = new AtomicReference<>();
private Instant startTime; private final Instant startTime;
private static long MILLIS_PER_SECOND = 1000L; private final static long MILLIS_PER_SECOND = 1000L;
private AtomicLong timeDifference = new AtomicLong(); private final AtomicLong timeDifference = new AtomicLong();
private AtomicInteger errorCounter = new AtomicInteger(); private final AtomicInteger errorCounter = new AtomicInteger();
private AtomicInteger warningCounter = new AtomicInteger(); private final AtomicInteger warningCounter = new AtomicInteger();
private AtomicInteger scnFreezeCounter = new AtomicInteger(); private final AtomicInteger scnFreezeCounter = new AtomicInteger();
TransactionalBufferMetrics(CdcSourceTaskContext taskContext) { TransactionalBufferMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "log-miner-transactional-buffer"); super(taskContext, "log-miner-transactional-buffer");
@ -62,7 +62,7 @@ public void setCommittedScn(Long scn) {
} }
public void setTimeDifference(AtomicLong timeDifference) { public void setTimeDifference(AtomicLong timeDifference) {
this.timeDifference = timeDifference; this.timeDifference.set(timeDifference.get());
} }
void calculateLagMetrics(Instant changeTime) { void calculateLagMetrics(Instant changeTime) {

View File

@ -33,6 +33,7 @@
import io.debezium.connector.oracle.util.TestHelper; import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord; import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing; import io.debezium.util.Testing;
/** /**

View File

@ -18,26 +18,15 @@
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleSnapshotChangeEventSource;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.IoUtil;
@SkipWhenAdapterNameIsNot(value = AdapterName.LOGMINER) @SkipWhenAdapterNameIsNot(value = AdapterName.LOGMINER)
public class LogMinerUtilsTest { public class LogMinerUtilsTest {
private static final BigDecimal SCN = BigDecimal.ONE; private static final BigDecimal SCN = BigDecimal.ONE;
private static final BigDecimal OTHER_SCN = BigDecimal.TEN; private static final BigDecimal OTHER_SCN = BigDecimal.TEN;
private OracleDdlParser ddlParser;
private Tables tables;
private static final String TABLE_NAME = "TEST";
private static final String CATALOG_NAME = "ORCLPDB1";
private static final String SCHEMA_NAME = "DEBEZIUM";
@Rule @Rule
public TestRule skipRule = new SkipTestDependingOnAdapterNameRule(); public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();
@ -66,55 +55,6 @@ public void testStartLogMinerStatement() {
assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isTrue(); assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isTrue();
} }
@Test
public void testBlacklistFiltering() throws Exception {
ddlParser = new OracleDdlParser(true, CATALOG_NAME, SCHEMA_NAME);
tables = new Tables();
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);
Table table = tables.forTable(new TableId(CATALOG_NAME, SCHEMA_NAME, TABLE_NAME));
String prefix = CATALOG_NAME + "." + TABLE_NAME + ".";
String blacklistedColumns = prefix + "COL2," + prefix + "COL3";
String whitelistedColumns = OracleSnapshotChangeEventSource.buildSelectColumns(blacklistedColumns, table);
assertThat(whitelistedColumns.contains("COL2")).isFalse();
assertThat(whitelistedColumns.contains("COL3")).isFalse();
assertThat(whitelistedColumns.contains("COL4")).isTrue();
prefix = TABLE_NAME + ".";
blacklistedColumns = prefix + "COL2," + prefix + "COL3";
whitelistedColumns = OracleSnapshotChangeEventSource.buildSelectColumns(blacklistedColumns.toLowerCase(), table);
assertThat(whitelistedColumns.contains("COL2")).isFalse();
assertThat(whitelistedColumns.contains("COL3")).isFalse();
assertThat(whitelistedColumns.contains("COL4")).isTrue();
prefix = "";
blacklistedColumns = prefix + "COL2," + prefix + "COL3";
whitelistedColumns = OracleSnapshotChangeEventSource.buildSelectColumns(blacklistedColumns, table);
assertThat(whitelistedColumns.equals("*")).isTrue();
prefix = "NONEXISTINGTABLE.";
blacklistedColumns = prefix + "COL2," + prefix + "COL3";
whitelistedColumns = OracleSnapshotChangeEventSource.buildSelectColumns(blacklistedColumns, table);
assertThat(whitelistedColumns.equals("*")).isTrue();
prefix = TABLE_NAME + ".";
blacklistedColumns = prefix + "col2," + prefix + "CO77";
whitelistedColumns = OracleSnapshotChangeEventSource.buildSelectColumns(blacklistedColumns, table);
assertThat(whitelistedColumns.contains("COL2")).isFalse();
assertThat(whitelistedColumns.contains("CO77")).isFalse();
assertThat(whitelistedColumns.contains("COL4")).isTrue();
blacklistedColumns = "";
whitelistedColumns = OracleSnapshotChangeEventSource.buildSelectColumns(blacklistedColumns, table);
assertThat(whitelistedColumns.equals("*")).isTrue();
blacklistedColumns = null;
whitelistedColumns = OracleSnapshotChangeEventSource.buildSelectColumns(blacklistedColumns, table);
assertThat(whitelistedColumns.equals("*")).isTrue();
}
// todo delete after replacement == -1 in the code // todo delete after replacement == -1 in the code
@Test @Test
public void testConversion() { public void testConversion() {