DBZ-3099 Address review comments

This commit is contained in:
Jiri Pechanec 2021-02-16 06:33:38 +01:00 committed by Gunnar Morling
parent 5ca1d9fc4f
commit cb1476ce18
8 changed files with 32 additions and 59 deletions

View File

@ -30,6 +30,10 @@ public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory {
private final Clock clock;
private final MySqlTaskContext taskContext;
private final MySqlStreamingChangeEventSourceMetrics streamingMetrics;
// MySQL snapshot requires buffering to modify the last record in the snapshot as sometimes it is
// impossible to detect it till the snapshot is ended. Mainly when the last snapshotted table is empty.
// Based on the DBZ-3113 the code can change in the future and it will be handled not in MySQL
// but in the core shared code.
private final ChangeEventQueue<DataChangeEvent> queue;
public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MySqlConnection connection,

View File

@ -8,11 +8,11 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
@ -77,7 +77,7 @@ public class MySqlDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private final DdlParser ddlParser;
private final RelationalTableFilters filters;
private final DdlChanges ddlChanges;
private final Map<Long, TableId> tableIdsByTableNumber = new HashMap<>();
private final Map<Long, TableId> tableIdsByTableNumber = new ConcurrentHashMap<>();
private boolean storageInitialiationExecuted = false;
/**
@ -244,7 +244,7 @@ private List<SchemaChangeEvent> parseDdl(String ddlStatements, String databaseNa
events.forEach(event -> {
final TableId tableId = getTableId(event);
offset.tableEvent(dbName, tableIds, sourceTime);
// For SET with multpile parameters
// For SET with multiple parameters
if (event instanceof TableCreatedEvent) {
emitChangeEvent(offset, schemaChangeEvents, sanitizedDbName, event, tableId, SchemaChangeEventType.CREATE, snapshot);
}

View File

@ -216,7 +216,7 @@ protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext snapshotCont
// would implicitly commit our active transaction, and this would break our consistent snapshot logic.
// Therefore, we cannot unlock the tables here!
// https://dev.mysql.com/doc/refman/5.7/en/flush.html
LOGGER.info("Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.");
LOGGER.warn("Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.");
}
}
}
@ -250,19 +250,7 @@ protected void releaseDataSnapshotLocks(RelationalSnapshotContext snapshotContex
super.lastSnapshotRecord(snapshotContext);
}
try {
dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> {
try {
receiver.schemaChangeEvent(event);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> receiver.schemaChangeEvent(event));
}
// Make schema available for snapshot source
@ -656,19 +644,7 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
if (!snapshottingTask.snapshotData() && !i.hasNext()) {
lastSnapshotRecord(snapshotContext);
}
try {
dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> {
try {
receiver.schemaChangeEvent(event);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> receiver.schemaChangeEvent(event));
}
// Make schema available for snapshot source

View File

@ -124,6 +124,9 @@ public void testSnapshotOnlyMetrics() throws Exception {
.build());
assertSnapshotMetrics();
// The legacy implementation did not exposed streaming metrics when only snapshot was executed.
// All other connectors based on new framework exposes streaming metrics always so we are
// following the same behaviour in the new implementation
if (isLegacy()) {
assertNoStreamingMetricsExist();
}

View File

@ -68,6 +68,11 @@ public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
private final Supplier<PreviousContext> loggingContextSupplier;
private AtomicLong currentQueueSizeInBytes = new AtomicLong(0);
private Map<T, Long> objectMap = new ConcurrentHashMap<>();
// Sometimes it is necessary to update the record before it is delivered depending on the content
// of the following record. In that cases the easiest solution is to provide a single cell buffer
// that will allow the modification of it during the explicit flush.
// Typical example is MySQL connector when sometimes it is impossible to detect when the record
// in process is the last one. In this case the snapshot flags are set during the explicit flush.
private boolean buffering;
private T bufferedEvent;

View File

@ -6,6 +6,7 @@
package io.debezium.connector.common;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
@ -80,7 +81,7 @@ protected static enum State {
private final ElapsedTimeStrategy pollOutputDelay;
private final Clock clock = Clock.system();
private long recordCounter = 0L;
private long previousOutputMillis = 0L;
private Instant previousOutputInstant;
protected BaseSourceTask() {
// Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...
@ -88,7 +89,7 @@ protected BaseSourceTask() {
// Initial our poll output delay logic ...
pollOutputDelay.hasElapsed();
previousOutputMillis = clock.currentTimeInMillis();
previousOutputInstant = clock.currentTimeAsInstant();
}
@Override
@ -162,7 +163,7 @@ public final List<SourceRecord> poll() throws InterruptedException {
}
void logStatistics(final List<SourceRecord> records) {
if (records == null) {
if (records == null || !LOGGER.isInfoEnabled()) {
return;
}
int batchSize = records.size();
@ -172,17 +173,11 @@ void logStatistics(final List<SourceRecord> records) {
lastOffset = lastRecord.sourceOffset();
if (pollOutputDelay.hasElapsed()) {
// We want to record the status ...
long millisSinceLastOutput = clock.currentTimeInMillis() - previousOutputMillis;
try {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("{} records sent during previous {}, last recorded offset: {}", recordCounter,
Strings.duration(millisSinceLastOutput), lastOffset);
}
}
finally {
recordCounter = 0;
previousOutputMillis += millisSinceLastOutput;
}
final Instant currentTime = clock.currentTime();
LOGGER.info("{} records sent during previous {}, last recorded offset: {}", recordCounter,
Strings.duration(Duration.between(previousOutputInstant, currentTime).toMillis()), lastOffset);
recordCounter = 0;
previousOutputInstant = currentTime;
}
}
}

View File

@ -285,7 +285,7 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
*/
protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext, Table table) throws Exception;
private void createDataEvents(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext) throws InterruptedException {
private void createDataEvents(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext) throws Exception {
SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();
tryStartingSnapshot(snapshotContext);
@ -305,16 +305,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, Relational
createDataEventsForTable(sourceContext, snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId), tableOrder++, tableCount);
}
try {
releaseDataSnapshotLocks(snapshotContext);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
catch (Exception e) {
throw new DebeziumException(e);
}
releaseDataSnapshotLocks(snapshotContext);
snapshotContext.offset.preSnapshotCompletion();
snapshotReceiver.completeSnapshot();
snapshotContext.offset.postSnapshotCompletion();

View File

@ -17,7 +17,6 @@
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A set of utilities for more easily creating various kinds of collections.
@ -288,9 +287,9 @@ public static <T> void set(List<T> list, int index, T value, T defaultValue) {
* @param minuend the elements to be removed
*/
public static <T> Set<T> minus(Set<T> subtrahend, Set<T> minuend) {
return subtrahend.stream()
.filter(x -> !minuend.contains(x))
.collect(Collectors.toSet());
final Set<T> r = new HashSet<T>(subtrahend);
r.removeAll(minuend);
return r;
}
private Collect() {