DBZ-3577 Support read-only MySQL incremental snapshots

This commit is contained in:
Kate 2021-05-31 00:28:08 -04:00 committed by Jiri Pechanec
parent 10ab20a157
commit 9b66ab374a
29 changed files with 1334 additions and 590 deletions

View File

@ -515,8 +515,8 @@
<database.port>${mysql.port}</database.port>
<database.replica.port>${mysql.port}</database.replica.port>
<skipLongRunningTests>false</skipLongRunningTests>
<runOrder>alphabetical</runOrder>
</systemPropertyVariables>
<runOrder>alphabetical</runOrder>
</configuration>
<executions>
<!-- First run the integration tests with the non-GTID server alone -->

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -15,8 +16,14 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import io.debezium.annotation.Immutable;
/**
@ -29,6 +36,7 @@
public final class GtidSet {
private final Map<String, UUIDSet> uuidSetsByServerId = new TreeMap<>(); // sorts on keys
public static Pattern GTID_DELIMITER = Pattern.compile(":");
protected GtidSet(Map<String, UUIDSet> uuidSetsByServerId) {
this.uuidSetsByServerId.putAll(uuidSetsByServerId);
@ -140,6 +148,29 @@ public GtidSet getGtidSetBeginning() {
return new GtidSet(newSet);
}
public boolean contains(String gtid) {
String[] split = GTID_DELIMITER.split(gtid);
String sourceId = split[0];
UUIDSet uuidSet = forServerWithId(sourceId);
if (uuidSet == null) {
return false;
}
long transactionId = Long.parseLong(split[1]);
return uuidSet.contains(transactionId);
}
public GtidSet subtract(GtidSet other) {
if (other == null) {
return this;
}
Map<String, UUIDSet> newSets = this.uuidSetsByServerId.entrySet()
.stream()
.filter(entry -> !entry.getValue().isContainedWithin(other.forServerWithId(entry.getKey())))
.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue().subtract(other.forServerWithId(entry.getKey()))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new GtidSet(newSets);
}
@Override
public int hashCode() {
return uuidSetsByServerId.keySet().hashCode();
@ -199,6 +230,11 @@ protected UUIDSet(String uuid, Interval interval) {
this.intervals.add(interval);
}
protected UUIDSet(String uuid, List<Interval> intervals) {
this.uuid = uuid;
this.intervals.addAll(intervals);
}
public UUIDSet asIntervalBeginning() {
Interval start = new Interval(intervals.get(0).getStart(), intervals.get(0).getStart());
return new UUIDSet(this.uuid, start);
@ -263,6 +299,15 @@ public boolean isContainedWithin(UUIDSet other) {
return true;
}
public boolean contains(long transactionId) {
for (Interval interval : this.intervals) {
if (interval.contains(transactionId)) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return uuid.hashCode();
@ -297,6 +342,19 @@ public String toString() {
}
return sb.toString();
}
public UUIDSet subtract(UUIDSet other) {
if (!uuid.equals(other.getUUID())) {
throw new IllegalArgumentException("UUIDSet subtraction is supported only within a single server UUID");
}
RangeSet<Long> rangeSet = TreeRangeSet.create();
intervals.forEach(interval -> rangeSet.add(Range.closed(interval.getStart(), interval.getEnd())));
other.getIntervals().forEach(interval -> rangeSet.remove(Range.closed(interval.getStart(), interval.getEnd())));
List<Interval> intervalList = rangeSet.asRanges().stream()
.map(range -> new Interval(range))
.collect(Collectors.toList());
return new UUIDSet(uuid, intervalList);
}
}
@Immutable
@ -310,6 +368,14 @@ public Interval(long start, long end) {
this.end = end;
}
private Interval(Range<Long> range) {
this.start = range.lowerBoundType() == BoundType.CLOSED ? range.lowerEndpoint() : range.lowerEndpoint() + 1;
this.end = range.upperBoundType() == BoundType.CLOSED ? range.upperEndpoint() : range.upperEndpoint() - 1;
if (start > end) {
throw new IllegalArgumentException("Empty interval: " + range);
}
}
/**
* Get the starting transaction number in this interval.
*
@ -346,6 +412,10 @@ public boolean isContainedWithin(Interval other) {
return this.getStart() >= other.getStart() && this.getEnd() <= other.getEnd();
}
public boolean contains(long transactionId) {
return getStart() <= transactionId && transactionId <= getEnd();
}
@Override
public int compareTo(Interval that) {
if (that == this) {

View File

@ -85,13 +85,26 @@ public Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>
MySqlOffsetContext offsetContext,
SnapshotProgressListener snapshotProgressListener,
DataChangeEventListener dataChangeEventListener) {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<TableId>(
if (configuration.isReadOnlyConnection()) {
if (connection.isGtidModeEnabled()) {
return Optional.of(new MySqlReadOnlyIncrementalSnapshotChangeEventSource<>(
configuration,
connection,
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener));
}
throw new UnsupportedOperationException("Read only connection requires GTID_MODE to be ON");
}
return Optional.of(new SignalBasedIncrementalSnapshotChangeEventSource<>(
configuration,
connection,
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener);
return Optional.of(incrementalSnapshotChangeEventSource);
dataChangeEventListener));
}
}

View File

@ -913,6 +913,14 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
"false - delegates the implicit conversion to the database" +
"true - (the default) Debezium makes the conversion");
public static final Field READ_ONLY_CONNECTION = Field.create("read.only")
.withDisplayName("Read only connection")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Switched connector to use alternative methods to deliver signals to Debezium instead of writing to signaling table");
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("MySQL")
.excluding(
@ -992,6 +1000,7 @@ public boolean supportsOperationFiltering() {
private final Predicate<String> ddlFilter;
private final boolean legacy;
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
private final boolean readOnlyConnection;
public MySqlConnectorConfig(Configuration config) {
super(
@ -1008,6 +1017,7 @@ public MySqlConnectorConfig(Configuration config) {
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.readOnlyConnection = config.getBoolean(READ_ONLY_CONNECTION);
final String gitIdNewChannelPosition = config.getString(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION);
this.gitIdNewChannelPosition = GtidNewChannelPosition.parse(gitIdNewChannelPosition, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.defaultValueAsString());
@ -1290,6 +1300,10 @@ boolean legacy() {
return legacy;
}
public boolean isReadOnlyConnection() {
return readOnlyConnection;
}
/**
* Intended for testing only
*/

View File

@ -17,6 +17,7 @@
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
@ -66,7 +67,8 @@ public MySqlOffsetContext(MySqlConnectorConfig connectorConfig, boolean snapshot
}
public MySqlOffsetContext(MySqlConnectorConfig connectorConfig, boolean snapshot, boolean snapshotCompleted, SourceInfo sourceInfo) {
this(connectorConfig, snapshot, snapshotCompleted, new TransactionContext(), new IncrementalSnapshotContext<>(),
this(connectorConfig, snapshot, snapshotCompleted, new TransactionContext(),
connectorConfig.isReadOnlyConnection() ? new MySqlReadOnlyIncrementalSnapshotContext<>() : new SignalBasedIncrementalSnapshotContext<>(),
sourceInfo);
}
@ -204,9 +206,15 @@ public MySqlOffsetContext load(Map<String, ?> offset) {
throw new ConnectException("Source offset '" + SourceInfo.BINLOG_FILENAME_OFFSET_KEY + "' parameter is missing");
}
long binlogPosition = longOffsetValue(offset, SourceInfo.BINLOG_POSITION_OFFSET_KEY);
IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
if (connectorConfig.isReadOnlyConnection()) {
incrementalSnapshotContext = MySqlReadOnlyIncrementalSnapshotContext.load(offset);
}
else {
incrementalSnapshotContext = SignalBasedIncrementalSnapshotContext.load(offset);
}
final MySqlOffsetContext offsetContext = new MySqlOffsetContext(connectorConfig, snapshot,
snapshotCompleted, TransactionContext.load(offset), IncrementalSnapshotContext.load(offset, TableId.class),
snapshotCompleted, TransactionContext.load(offset), incrementalSnapshotContext,
new SourceInfo(connectorConfig));
offsetContext.setBinlogStartPoint(binlogFilename, binlogPosition);
offsetContext.setInitialSkips(longOffsetValue(offset, EVENTS_TO_SKIP_OFFSET_KEY),

View File

@ -0,0 +1,192 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.sql.SQLException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
/**
* A MySQL specific read-only incremental snapshot change event source.
* Uses executed GTID set as low/high watermarks for incremental snapshot window to support read-only connection.
* <p>
* <b>Prerequisites</b>
* <ul>
* <li> gtid_mode=ON </li>
* <li> enforce_gtid_consistency=ON </li>
* <li> If the connector is reading from a replica, then for multithreaded replicas (replicas on which replica_parallel_workers is set to a value greater than 0) its required to set replica_preserve_commit_order=1 or slave_preserve_commit_order=1</li>
* </ul>
* </p>
* <p>
* <b>When a chunk should be snapshotted</b>
* <ul>
* <li> streaming is paused (this is implicit when the watermarks are handled) </li>
* <li> a SHOW MASTER STATUS query is executed and the low watermark is set to executed_gtid_set </li>
* <li> a new data chunk is read from a database by generating the SELECT statement and placed into a window buffer keyed by primary keys </li>
* <li> a SHOW MASTER STATUS query is executed and the high watermark is set to executed_gtid_set from SHOW MASTER STATUS subtract low watermark. In case the high watermark contains more than one unique server UUID value, steps 2 - 4 get redone </li>
* <li> streaming is resumed </li>
* </ul>
* </p>
* <p>
* <b>During the subsequent streaming</b>
* <ul>
* <li> if binlog event is received and its GTID is outside of the low watermark GTID set then window processing mode is enabled </li>
* <li> if binlog event is received and its GTID is outside of the high watermark GTID set then window processing mode is disabled and the rest of the windows buffer is streamed </li>
* <li> if server heartbeat event is received and its GTID reached the largest transaction id of high watermark then window processing mode is disabled and the rest of the windows buffer is streamed </li>
* <li> if window processing mode is enabled then if the event key is contained in the window buffer then it is removed from the window buffer </li>
* <li> event is streamed </li>
* </ul>
* </p>
* <br/>
* <b>Watermark checks</b>
* <p>If a watermark's GTID set doesnt contain a binlog events GTID then the watermark is passed and the window processing mode gets updated. Multiple binlog events can have the same GTID, this is why the algorithm waits for the binlog event with GTID outside of watermarks GTID set to close the window, instead of closing it as soon as the largest transaction id is reached.</p>
* <p>The deduplication starts with the first event after the low watermark because up to the point when GTID is contained in the low watermark (executed_gtid_set that was captured before the chunk select statement). A COMMIT after the low watermark is used to make sure a chunk selection sees the changes that are committed before its execution. </p>
* <p>The deduplication continues for all the events that are in the high watermark. The deduplicated chunk events are inserted right before the first event that is outside of the high watermark.</p>
* <br/>
* <b>No binlog events</b>
* <p>Server heartbeat events (events that are sent by a primary to a replica to let the replica know that the primary is still alive) are used to update the window processing mode when the rate of binlog updates is low. Server heartbeat is sent only if there are no binlog events for the duration of a heartbeat interval.</p>
* <p>The heartbeat has the same GTID as the latest binlog event at the moment (its a technical event that doesnt get written into the output stream, but can be used in events processing logic). In case there are zero updates after the chunk selection, the server heartbeats GTID will be within a high watermark. This is why for server heartbeat events GTID its enough to reach the largest transaction id of a high watermark to disable the window processing mode, send a chunk and proceed to the next one.</p>
* <p>The server UUID part of heartbeats GTID is used to get the max transaction id of a high watermark for the same server UUID. High watermark is set to a difference between executed_gtid_set before and after chunk selection. If a high watermark contains more than one unique server UUID the chunk selection is redone and watermarks are recaptured. This is done to avoid the scenario when the window is closed too early by heartbeat because server UUID changes between high and low watermarks. Heartbeat doesnt need to check the window processing mode, it doesnt affect correctness and simplifies the checks for the cases when the binlog reader was up to date with the low watermark and when there are no new events between high and low watermarks.</p>
* <br/>
* <b>No changes between watermarks</b>
* <p>A window can be opened and closed right away by the same event. This can happen when a high watermark is an empty set, which means there were no binlog events during the chunk select. Chunk will get inserted right after the low watermark, no events will be deduplicated from the chunk</p>
* <br/>
* <b>No updates for included tables</b>
* <p>Its important to receive binlog events for the Backfill to make progress.All binlog events are checked against the low and high watermarks, including the events from the tables that arent included in the connector. This guarantees that the window processing mode gets updated even when none of the tables included in the connector are getting binlog events.</p>
*/
public class MySqlReadOnlyIncrementalSnapshotChangeEventSource<T extends DataCollectionId> extends AbstractIncrementalSnapshotChangeEventSource<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlReadOnlyIncrementalSnapshotChangeEventSource.class);
private final String showMasterStmt = "SHOW MASTER STATUS";
public MySqlReadOnlyIncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection,
EventDispatcher<T> dispatcher,
DatabaseSchema<?> databaseSchema, Clock clock,
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener);
}
@Override
public void processMessage(DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException {
if (getContext() == null) {
LOGGER.warn("Context is null, skipping message processing");
return;
}
LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window);
boolean windowClosed = getContext().updateWindowState(offsetContext);
if (windowClosed) {
sendWindowEvents(offsetContext);
readChunk();
}
else if (!window.isEmpty() && getContext().deduplicationNeeded()) {
deduplicateWindow(dataCollectionId, key);
}
}
@Override
public void processHeartbeat(OffsetContext offsetContext) throws InterruptedException {
if (getContext() == null) {
LOGGER.warn("Context is null, skipping message processing");
return;
}
while (getContext().snapshotRunning() && getContext().reachedHighWatermark(offsetContext)) {
sendWindowEvents(offsetContext);
readChunk();
}
}
@Override
public void processFilteredEvent(OffsetContext offsetContext) throws InterruptedException {
if (getContext() == null) {
LOGGER.warn("Context is null, skipping message processing");
return;
}
boolean windowClosed = getContext().updateWindowState(offsetContext);
if (windowClosed) {
sendWindowEvents(offsetContext);
readChunk();
}
}
protected void updateLowWatermark() {
try {
getExecutedGtidSet(getContext()::setLowWatermark);
// it is required that the chunk selection sees the changes that are committed before its execution
jdbcConnection.commit();
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
protected void updateHighWatermark() {
getExecutedGtidSet(getContext()::setHighWatermark);
}
private void getExecutedGtidSet(Consumer<GtidSet> watermark) {
try {
jdbcConnection.query(showMasterStmt, rs -> {
if (rs.next()) {
if (rs.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...
final String gtidSet = rs.getString(5); // GTID set, may be null, blank, or contain a GTID set
watermark.accept(new GtidSet(gtidSet));
}
else {
throw new UnsupportedOperationException("Need to add support for executed GTIDs for versions prior to 5.6.5");
}
}
});
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
@Override
protected void emitWindowOpen() {
updateLowWatermark();
}
@Override
protected void emitWindowClose() throws InterruptedException {
updateHighWatermark();
if (getContext().serverUuidChanged()) {
rereadChunk();
}
}
public void rereadChunk() throws InterruptedException {
if (context == null) {
return;
}
if (!context.snapshotRunning() || !context.deduplicationNeeded() || window.isEmpty()) {
return;
}
window.clear();
context.revertChunk();
readChunk();
}
private MySqlReadOnlyIncrementalSnapshotContext<T> getContext() {
return (MySqlReadOnlyIncrementalSnapshotContext<T>) context;
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static io.debezium.connector.mysql.GtidSet.GTID_DELIMITER;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@NotThreadSafe
public class MySqlReadOnlyIncrementalSnapshotContext<T> extends AbstractIncrementalSnapshotContext<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlReadOnlyIncrementalSnapshotContext.class);
private GtidSet lowWatermark = null;
private GtidSet highWatermark = null;
public MySqlReadOnlyIncrementalSnapshotContext() {
this(true);
}
public MySqlReadOnlyIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
super(useCatalogBeforeSchema);
}
public static <U> MySqlReadOnlyIncrementalSnapshotContext<U> load(Map<String, ?> offsets) {
return load(offsets, true);
}
public static <U> MySqlReadOnlyIncrementalSnapshotContext<U> load(Map<String, ?> offsets, boolean useCatalogBeforeSchema) {
MySqlReadOnlyIncrementalSnapshotContext<U> context = new MySqlReadOnlyIncrementalSnapshotContext<>(useCatalogBeforeSchema);
init(context, offsets);
return context;
}
public void setLowWatermark(GtidSet lowWatermark) {
this.lowWatermark = lowWatermark;
}
public void setHighWatermark(GtidSet highWatermark) {
this.highWatermark = highWatermark.subtract(lowWatermark);
}
public boolean updateWindowState(OffsetContext offsetContext) {
String currentGtid = offsetContext.getSourceInfo().getString(SourceInfo.GTID_KEY);
if (!windowOpened && lowWatermark != null) {
boolean pastLowWatermark = !lowWatermark.contains(currentGtid);
if (pastLowWatermark) {
LOGGER.debug("Current gtid {}, low watermark {}", currentGtid, lowWatermark);
windowOpened = true;
lowWatermark = null;
}
}
if (windowOpened && highWatermark != null) {
boolean pastHighWatermark = !highWatermark.contains(currentGtid);
if (pastHighWatermark) {
LOGGER.debug("Current gtid {}, high watermark {}", currentGtid, highWatermark);
windowOpened = false;
highWatermark = null;
return true;
}
}
return false;
}
public boolean reachedHighWatermark(OffsetContext offsetContext) {
String currentGtid = offsetContext.getSourceInfo().getString(SourceInfo.GTID_KEY);
if (highWatermark == null) {
return false;
}
if (currentGtid == null) {
return true;
}
String[] gtid = GTID_DELIMITER.split(currentGtid);
GtidSet.UUIDSet uuidSet = highWatermark.forServerWithId(gtid[0]);
if (uuidSet != null) {
long maxTransactionId = uuidSet.getIntervals().stream()
.mapToLong(GtidSet.Interval::getEnd)
.max()
.getAsLong();
if (maxTransactionId <= Long.parseLong(gtid[1])) {
LOGGER.debug("Heartbeat {} reached high watermark {}", currentGtid, highWatermark);
windowOpened = false;
highWatermark = null;
lowWatermark = null;
return true;
}
}
return false;
}
public boolean serverUuidChanged() {
return highWatermark.getUUIDSets().size() > 1;
}
}

View File

@ -403,8 +403,9 @@ protected void handleServerStop(MySqlOffsetContext offsetContext, Event event) {
*
* @param event the server stopped event to be processed; may not be null
*/
protected void handleServerHeartbeat(MySqlOffsetContext offsetContext, Event event) {
protected void handleServerHeartbeat(MySqlOffsetContext offsetContext, Event event) throws InterruptedException {
LOGGER.trace("Server heartbeat: {}", event);
eventDispatcher.dispatchServerHeartbeatEvent(offsetContext);
}
/**
@ -615,7 +616,7 @@ private void handleTransactionCompletion(MySqlOffsetContext offsetContext, Event
*
* @param event the update event; never null
*/
protected void handleUpdateTableMetadata(MySqlOffsetContext offsetContext, Event event) {
protected void handleUpdateTableMetadata(MySqlOffsetContext offsetContext, Event event) throws InterruptedException {
TableMapEventData metadata = unwrapData(event);
long tableNumber = metadata.getTableId();
String databaseName = metadata.getDatabase();
@ -634,7 +635,7 @@ protected void handleUpdateTableMetadata(MySqlOffsetContext offsetContext, Event
* don't know, either ignore that event or raise a warning or error as per the
* {@link MySqlConnectorConfig#INCONSISTENT_SCHEMA_HANDLING_MODE} configuration.
*/
private void informAboutUnknownTableIfRequired(MySqlOffsetContext offsetContext, Event event, TableId tableId, String typeToLog) {
private void informAboutUnknownTableIfRequired(MySqlOffsetContext offsetContext, Event event, TableId tableId, String typeToLog) throws InterruptedException {
if (tableId != null && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
metrics.onErroneousEvent("source = " + tableId + ", event " + event);
EventHeaderV4 eventHeader = event.getHeader();
@ -668,6 +669,7 @@ else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WA
else {
LOGGER.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);
metrics.onFilteredEvent("source = " + tableId);
eventDispatcher.dispatchFilteredEvent(offsetContext);
}
}

View File

@ -132,6 +132,24 @@ public void shouldFilterServerUuids() {
assertThat(filtered.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004")).isNull();
}
@Test
public void subtract() {
String gtidStr1 = "036d85a9-64e5-11e6-9b48-42010af0000c:1-20,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41";
String gtidStr2 = "036d85a9-64e5-11e6-9b48-42010af0000c:1-21,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-49";
String diff = "036d85a9-64e5-11e6-9b48-42010af0000c:21,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:42-49";
GtidSet gtidSet1 = new GtidSet(gtidStr1);
GtidSet gtidSet2 = new GtidSet(gtidStr2);
GtidSet gtidSetDiff = gtidSet2.subtract(gtidSet1);
GtidSet expectedDiff = new GtidSet(diff);
assertThat(gtidSetDiff).isEqualTo(expectedDiff);
}
protected void asertIntervalCount(String uuid, int count) {
UUIDSet set = gtids.forServerWithId(uuid);
assertThat(set.getIntervals().size()).isEqualTo(count);

View File

@ -19,8 +19,8 @@
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<MySqlConnector> {
private static final String SERVER_NAME = "is_test";
private final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "incremental_snapshot_test").withDbHistoryPath(DB_HISTORY_PATH);
protected static final String SERVER_NAME = "is_test";
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "incremental_snapshot_test").withDbHistoryPath(DB_HISTORY_PATH);
@Before
public void before() throws SQLException {

View File

@ -0,0 +1,90 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.sql.SQLException;
import java.util.Map;
import java.util.function.Supplier;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.ShouldFailWhen;
import io.debezium.util.Testing;
@ShouldFailWhen(ReadOnlyIncrementalSnapshotIT.IsGtidModeOff.class)
public class ReadOnlyIncrementalSnapshotIT extends IncrementalSnapshotIT {
public static final String EXCLUDED_TABLE = "b";
@Rule
public TestRule conditionalFail = new ConditionalFail();
protected Configuration.Builder config() {
return super.config()
.with(MySqlConnectorConfig.TABLE_EXCLUDE_LIST, DATABASE.getDatabaseName() + "." + EXCLUDED_TABLE)
.with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true);
}
@Test
public void filteredEvents() throws Exception {
Testing.Print.enable();
populateTable();
startConnector();
sendAdHocSnapshotSignal();
Thread t = new Thread(() -> {
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
for (int i = 0; true; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)",
EXCLUDED_TABLE,
i + ROW_COUNT + 1,
i + ROW_COUNT));
connection.commit();
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}
});
t.setDaemon(true);
t.setName("filtered-binlog-events-thread");
t.start();
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
public static class IsGtidModeOff implements Supplier<Boolean> {
public Boolean get() {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase("emptydb")) {
return db.queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'",
rs -> {
if (rs.next()) {
return "OFF".equalsIgnoreCase(rs.getString(2));
}
throw new IllegalStateException("Cannot obtain GTID status");
});
}
catch (SQLException e) {
throw new IllegalStateException("Cannot obtain GTID status", e);
}
}
}
}

View File

@ -6,10 +6,17 @@
package io.debezium.connector.mysql;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
@ -75,14 +82,49 @@ public void transactionMetadataEnabled() throws InterruptedException, SQLExcepti
connection.commit();
}
}
String txId = null;
List<SourceRecord> allRecords = new ArrayList<>();
// read records until the transaction is found
for (int i = 0; txId == null && i < 50; i++) {
List<SourceRecord> records = consumeRecordsByTopic(100).allRecordsInOrder();
txId = getTxId(records);
allRecords.addAll(records);
}
assertNotNull("Failed to find the transaction", txId);
int beginIndex = findFirstEvent(allRecords, txId);
if (allRecords.size() < beginIndex + 6) {
allRecords.addAll(consumeRecordsByTopic(6).allRecordsInOrder());
}
List<SourceRecord> transactionRecords = allRecords.subList(beginIndex, beginIndex + 1 + 4 + 1);
assertFalse(transactionRecords.isEmpty());
// BEGIN + 4 INSERT + END
// Initial few records would have database history changes hence fetching 4+6 records
List<SourceRecord> records = consumeRecordsByTopic(1 + 4 + 1).allRecordsInOrder();
assertEquals(1 + 4 + 1, transactionRecords.size());
String databaseName = DATABASE.getDatabaseName();
final String txId = assertBeginTransaction(records.get(0));
assertEndTransaction(records.get(5), txId, 4, Collect.hashMapOf(databaseName + ".products", 1,
String beginTxId = assertBeginTransaction(transactionRecords.get(0));
assertEquals(txId, beginTxId);
assertEndTransaction(transactionRecords.get(5), txId, 4, Collect.hashMapOf(databaseName + ".products", 1,
databaseName + ".customers", 2,
databaseName + ".orders", 1));
}
private String getTxId(List<SourceRecord> records) {
Optional<Struct> product = records.stream()
.map(sr -> (Struct) sr.value())
.filter(sr -> sr.schema().field("source") != null)
.filter(sr -> sr.getStruct("source").getString("table").equals("products"))
.filter(s -> s.getStruct("after").getString("description").equals("Toy robot"))
.findFirst();
return product.map(struct -> (String) struct.getStruct("transaction").get("id")).orElse(null);
}
private int findFirstEvent(List<SourceRecord> records, String txId) {
int i = 0;
for (SourceRecord sr : records) {
if (((Struct) sr.value()).getString("id").equals(txId)) {
return i;
}
i++;
}
return -1;
}
}

View File

@ -7,6 +7,11 @@ CREATE TABLE a (
aa INTEGER
) AUTO_INCREMENT = 1;
CREATE TABLE b (
pk INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
aa INTEGER
) AUTO_INCREMENT = 1;
CREATE TABLE debezium_signal (
id varchar(64),
type varchar(32),

View File

@ -92,6 +92,7 @@ public Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<TableId>(
configuration,
jdbcConnection,
dispatcher,
schema,
clock,
snapshotProgressListener,

View File

@ -22,6 +22,7 @@
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
@ -212,7 +213,7 @@ public PostgresOffsetContext load(Map<String, ?> offset) {
final boolean snapshot = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE);
final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE);
return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, lastCommitLsn, txId, useconds, snapshot, lastSnapshotRecord,
TransactionContext.load(offset), IncrementalSnapshotContext.load(offset, TableId.class));
TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset));
}
}
@ -246,7 +247,7 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne
false,
false,
new TransactionContext(),
new IncrementalSnapshotContext<>());
new SignalBasedIncrementalSnapshotContext<>());
}
catch (SQLException e) {
throw new ConnectException("Database processing error", e);

View File

@ -67,6 +67,7 @@ public Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<TableId>(
configuration,
dataConnection,
dispatcher,
schema,
clock,
snapshotProgressListener,

View File

@ -14,6 +14,7 @@
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
@ -60,7 +61,7 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
}
public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted) {
this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext(), new IncrementalSnapshotContext<>());
this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>());
}
@Override
@ -162,7 +163,7 @@ public SqlServerOffsetContext load(Map<String, ?> offset) {
}
return new SqlServerOffsetContext(connectorConfig, TxLogPosition.valueOf(commitLsn, changeLsn), snapshot, snapshotCompleted, eventSerialNo,
TransactionContext.load(offset), IncrementalSnapshotContext.load(offset, TableId.class));
TransactionContext.load(offset), SignalBasedIncrementalSnapshotContext.load(offset));
}
}

View File

@ -201,6 +201,7 @@ public boolean dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter c
if (!filter.isIncluded(dataCollectionId)) {
LOGGER.trace("Filtered data change event for {}", dataCollectionId);
eventListener.onFilteredEvent("source = " + dataCollectionId);
dispatchFilteredEvent(changeRecordEmitter.getOffset());
}
else {
DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
@ -266,6 +267,12 @@ public void changeRecord(DataCollectionSchema schema,
}
}
public void dispatchFilteredEvent(OffsetContext offset) throws InterruptedException {
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processFilteredEvent(offset);
}
}
public void dispatchTransactionCommittedEvent(OffsetContext offset) throws InterruptedException {
transactionMonitor.transactionComittedEvent(offset);
}
@ -350,6 +357,12 @@ private void enqueueSchemaChangeMessage(SourceRecord record) throws InterruptedE
queue.enqueue(new DataChangeEvent(record));
}
public void dispatchServerHeartbeatEvent(OffsetContext offset) throws InterruptedException {
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processHeartbeat(offset);
}
}
/**
* Change record receiver used during snapshotting. Allows for a deferred submission of records, which is needed in
* order to set the "snapshot completed" offset field, which we can't send to Kafka Connect without sending an

View File

@ -0,0 +1,379 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.source.snapshot.incremental;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
/**
* An incremental snapshot change event source that emits events from a DB log interleaved with snapshot events.
*/
@NotThreadSafe
public abstract class AbstractIncrementalSnapshotChangeEventSource<T extends DataCollectionId> implements IncrementalSnapshotChangeEventSource<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIncrementalSnapshotChangeEventSource.class);
private final CommonConnectorConfig connectorConfig;
private final Clock clock;
private final RelationalDatabaseSchema databaseSchema;
private final SnapshotProgressListener progressListener;
private final DataChangeEventListener dataListener;
private long totalRowsScanned = 0;
private Table currentTable;
protected EventDispatcher<T> dispatcher;
protected IncrementalSnapshotContext<T> context = null;
protected JdbcConnection jdbcConnection;
protected final Map<Struct, Object[]> window = new LinkedHashMap<>();
public AbstractIncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection, EventDispatcher<T> dispatcher,
DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
this.connectorConfig = config;
this.jdbcConnection = jdbcConnection;
this.dispatcher = dispatcher;
this.databaseSchema = (RelationalDatabaseSchema) databaseSchema;
this.clock = clock;
this.progressListener = progressListener;
this.dataListener = dataChangeEventListener;
}
@Override
@SuppressWarnings("unchecked")
public void closeWindow(String id, OffsetContext offsetContext) throws InterruptedException {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
if (!context.closeWindow(id)) {
return;
}
sendWindowEvents(offsetContext);
readChunk();
}
protected void sendWindowEvents(OffsetContext offsetContext) throws InterruptedException {
LOGGER.debug("Sending {} events from window buffer", window.size());
offsetContext.incrementalSnapshotEvents();
for (Object[] row : window.values()) {
sendEvent(dispatcher, offsetContext, row);
}
offsetContext.postSnapshotCompletion();
window.clear();
}
protected void sendEvent(EventDispatcher<T> dispatcher, OffsetContext offsetContext, Object[] row) throws InterruptedException {
context.sendEvent(keyFromRow(row));
offsetContext.event(context.currentDataCollectionId(), clock.currentTimeAsInstant());
dispatcher.dispatchSnapshotEvent(context.currentDataCollectionId(),
getChangeRecordEmitter(context.currentDataCollectionId(), offsetContext, row),
dispatcher.getIncrementalSnapshotChangeEventReceiver(dataListener));
}
/**
* Returns a {@link ChangeRecordEmitter} producing the change records for
* the given table row.
*/
protected ChangeRecordEmitter getChangeRecordEmitter(T dataCollectionId, OffsetContext offsetContext,
Object[] row) {
return new SnapshotChangeRecordEmitter(offsetContext, row, clock);
}
protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key) {
if (!context.currentDataCollectionId().equals(dataCollectionId)) {
return;
}
if (key instanceof Struct) {
if (window.remove((Struct) key) != null) {
LOGGER.info("Removed '{}' from window", key);
}
}
}
/**
* Update low watermark for the incremental snapshot chunk
*/
protected abstract void emitWindowOpen() throws SQLException;
/**
* Update high watermark for the incremental snapshot chunk
*/
protected abstract void emitWindowClose() throws SQLException, InterruptedException;
protected String buildChunkQuery(Table table) {
String condition = null;
// Add condition when this is not the first query
if (context.isNonInitialChunk()) {
final StringBuilder sql = new StringBuilder();
// Window boundaries
addKeyColumnsToCondition(table, sql, " >= ?");
sql.append(" AND NOT (");
addKeyColumnsToCondition(table, sql, " = ?");
sql.append(")");
// Table boundaries
sql.append(" AND ");
addKeyColumnsToCondition(table, sql, " <= ?");
condition = sql.toString();
}
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
.collect(Collectors.joining(", "));
return jdbcConnection.buildSelectWithRowLimits(table.id(),
connectorConfig.getIncrementalSnashotChunkSize(),
"*",
Optional.ofNullable(condition),
orderBy);
}
protected String buildMaxPrimaryKeyQuery(Table table) {
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
.collect(Collectors.joining(" DESC, ")) + " DESC";
return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, "*", Optional.empty(), orderBy);
}
@Override
@SuppressWarnings("unchecked")
public void init(OffsetContext offsetContext) {
if (offsetContext == null) {
LOGGER.info("Empty incremental snapshot change event source started, no action needed");
return;
}
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
if (!context.snapshotRunning()) {
LOGGER.info("No incremental snapshot in progress, no action needed on start");
return;
}
LOGGER.info("Incremental snapshot in progress, need to read new chunk on start");
try {
progressListener.snapshotStarted();
readChunk();
}
catch (InterruptedException e) {
throw new DebeziumException("Reading of an initial chunk after connector restart has been interrupted");
}
LOGGER.info("Incremental snapshot in progress, loading of initial chunk completed");
}
protected void readChunk() throws InterruptedException {
if (!context.snapshotRunning()) {
LOGGER.info("Skipping read chunk because snapshot is not running");
return;
}
try {
// This commit should be unnecessary and might be removed later
jdbcConnection.commit();
context.startNewChunk();
emitWindowOpen();
while (context.snapshotRunning()) {
final TableId currentTableId = (TableId) context.currentDataCollectionId();
currentTable = databaseSchema.tableFor(currentTableId);
if (currentTable == null) {
LOGGER.warn("Schema not found for table '{}', known tables {}", currentTableId, databaseSchema.tableIds());
break;
}
if (currentTable.primaryKeyColumns().isEmpty()) {
LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId);
break;
}
if (!context.maximumKey().isPresent()) {
context.maximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> {
if (!rs.next()) {
return null;
}
return keyFromRow(jdbcConnection.rowToArray(currentTable, databaseSchema, rs,
ColumnUtils.toArray(rs, currentTable)));
}));
if (!context.maximumKey().isPresent()) {
LOGGER.info(
"No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty",
currentTableId);
context.nextDataCollection();
continue;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId,
context.maximumKey().orElse(new Object[0]));
}
}
createDataEventsForTable();
if (window.isEmpty()) {
LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
currentTableId);
tableScanCompleted();
context.nextDataCollection();
if (!context.snapshotRunning()) {
progressListener.snapshotCompleted();
}
}
else {
break;
}
}
emitWindowClose();
}
catch (SQLException e) {
throw new DebeziumException(String.format("Database error while executing incremental snapshot for table '%s'", context.currentDataCollectionId()), e);
}
}
@Override
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, OffsetContext offsetContext) throws InterruptedException {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
boolean shouldReadChunk = !context.snapshotRunning();
final List<T> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(dataCollectionIds);
if (shouldReadChunk) {
progressListener.snapshotStarted();
progressListener.monitoredDataCollectionsDetermined(newDataCollectionIds);
readChunk();
}
}
protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) {
for (Iterator<Column> i = table.primaryKeyColumns().iterator(); i.hasNext();) {
final Column key = i.next();
sql.append(key.name()).append(predicate);
if (i.hasNext()) {
sql.append(" AND ");
}
}
}
/**
* Dispatches the data change events for the records of a single table.
*/
private void createDataEventsForTable() {
long exportStart = clock.currentTimeInMillis();
LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), context.tablesToBeSnapshottedCount());
final String selectStatement = buildChunkQuery(currentTable);
LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(),
selectStatement, context.chunkEndPosititon(), context.maximumKey().get());
final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id());
try (PreparedStatement statement = readTableChunkStatement(selectStatement);
ResultSet rs = statement.executeQuery()) {
final ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, currentTable);
long rows = 0;
Timer logTimer = getTableScanLogTimer();
Object[] lastRow = null;
Object[] firstRow = null;
while (rs.next()) {
rows++;
final Object[] row = jdbcConnection.rowToArray(currentTable, databaseSchema, rs, columnArray);
if (firstRow == null) {
firstRow = row;
}
final Struct keyStruct = tableSchema.keyFromColumnData(row);
window.put(keyStruct, row);
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
LOGGER.debug("\t Exported {} records for table '{}' after {}", rows, currentTable.id(),
Strings.duration(stop - exportStart));
logTimer = getTableScanLogTimer();
}
lastRow = row;
}
final Object[] firstKey = keyFromRow(firstRow);
final Object[] lastKey = keyFromRow(lastRow);
context.nextChunkPosition(lastKey);
progressListener.currentChunk(context.currentChunkId(), firstKey, lastKey);
if (lastRow != null) {
LOGGER.debug("\t Next window will resume from '{}'", context.chunkEndPosititon());
}
LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
currentTable.id(), Strings.duration(clock.currentTimeInMillis() - exportStart));
incrementTableRowsScanned(rows);
}
catch (SQLException e) {
throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e);
}
}
private void incrementTableRowsScanned(long rows) {
totalRowsScanned += rows;
progressListener.rowsScanned(currentTable.id(), totalRowsScanned);
}
private void tableScanCompleted() {
progressListener.dataCollectionSnapshotCompleted(currentTable.id(), totalRowsScanned);
totalRowsScanned = 0;
}
protected PreparedStatement readTableChunkStatement(String sql) throws SQLException {
final PreparedStatement statement = jdbcConnection.readTablePreparedStatement(connectorConfig, sql,
OptionalLong.empty());
if (context.isNonInitialChunk()) {
final Object[] maximumKey = context.maximumKey().get();
final Object[] chunkEndPosition = context.chunkEndPosititon();
for (int i = 0; i < chunkEndPosition.length; i++) {
statement.setObject(i + 1, chunkEndPosition[i]);
statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]);
statement.setObject(i + 1 + 2 * chunkEndPosition.length, maximumKey[i]);
}
}
return statement;
}
private Timer getTableScanLogTimer() {
return Threads.timer(clock, RelationalSnapshotChangeEventSource.LOG_INTERVAL);
}
private Object[] keyFromRow(Object[] row) {
if (row == null) {
return null;
}
final List<Column> keyColumns = currentTable.primaryKeyColumns();
final Object[] key = new Object[keyColumns.size()];
for (int i = 0; i < keyColumns.size(); i++) {
key[i] = row[keyColumns.get(i).position() - 1];
}
return key;
}
protected void setContext(IncrementalSnapshotContext<T> context) {
this.context = context;
}
}

View File

@ -0,0 +1,243 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.source.snapshot.incremental;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.relational.TableId;
import io.debezium.util.HexConverter;
/**
* A class describing current state of incremental snapshot
*
* @author Jiri Pechanec
*
*/
@NotThreadSafe
public class AbstractIncrementalSnapshotContext<T> implements IncrementalSnapshotContext<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIncrementalSnapshotContext.class);
// TODO Consider which (if any) information should be exposed in source info
public static final String INCREMENTAL_SNAPSHOT_KEY = "incremental_snapshot";
public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY = INCREMENTAL_SNAPSHOT_KEY + "_collections";
public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key";
public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key";
/**
* @code(true) if window is opened and deduplication should be executed
*/
protected boolean windowOpened = false;
/**
* The last primary key in chunk that is now in process.
*/
private Object[] chunkEndPosition;
// TODO After extracting add into source info optional block
// incrementalSnapshotWindow{String from, String to}
// State to be stored and recovered from offsets
private final Queue<T> dataCollectionsToSnapshot = new LinkedList<>();
private final boolean useCatalogBeforeSchema;
/**
* The PK of the last record that was passed to Kafka Connect. In case of
* connector restart the start of the first chunk will be populated from it.
*/
private Object[] lastEventKeySent;
private String currentChunkId;
/**
* The largest PK in the table at the start of snapshot.
*/
private Object[] maximumKey;
public AbstractIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
}
public boolean openWindow(String id) {
if (!id.startsWith(currentChunkId)) {
LOGGER.info("Arrived request to open window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
return false;
}
LOGGER.debug("Opening window for incremental snapshot chunk");
windowOpened = true;
return true;
}
public boolean closeWindow(String id) {
if (!id.startsWith(currentChunkId)) {
LOGGER.info("Arrived request to close window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
return false;
}
LOGGER.debug("Closing window for incremental snapshot chunk");
windowOpened = false;
return true;
}
public boolean deduplicationNeeded() {
return windowOpened;
}
private String arrayToSerializedString(Object[] array) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(array);
return HexConverter.convertToHexString(bos.toByteArray());
}
catch (IOException e) {
throw new DebeziumException(String.format("Cannot serialize chunk information %s", array));
}
}
private Object[] serializedStringToArray(String field, String serialized) {
try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (Object[]) ois.readObject();
}
catch (Exception e) {
throw new DebeziumException(String.format("Failed to deserialize '%s' with value '%s'", field, serialized),
e);
}
}
private String dataCollectionsToSnapshotAsString() {
// TODO Handle non-standard table ids containing dots, commas etc.
return dataCollectionsToSnapshot.stream().map(Object::toString).collect(Collectors.joining(","));
}
private List<String> stringToDataCollections(String dataCollectionsStr) {
return Arrays.asList(dataCollectionsStr.split(","));
}
public boolean snapshotRunning() {
return !dataCollectionsToSnapshot.isEmpty();
}
public Map<String, Object> store(Map<String, Object> offset) {
if (!snapshotRunning()) {
return offset;
}
offset.put(EVENT_PRIMARY_KEY, arrayToSerializedString(lastEventKeySent));
offset.put(TABLE_MAXIMUM_KEY, arrayToSerializedString(maximumKey));
offset.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY, dataCollectionsToSnapshotAsString());
return offset;
}
private void addTablesIdsToSnapshot(List<T> dataCollectionIds) {
dataCollectionsToSnapshot.addAll(dataCollectionIds);
}
@SuppressWarnings("unchecked")
public List<T> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds) {
final List<T> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> (T) TableId.parse(x, useCatalogBeforeSchema))
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
return newDataCollectionIds;
}
protected static <U> IncrementalSnapshotContext<U> init(AbstractIncrementalSnapshotContext<U> context, Map<String, ?> offsets) {
final String lastEventSentKeyStr = (String) offsets.get(EVENT_PRIMARY_KEY);
context.chunkEndPosition = (lastEventSentKeyStr != null)
? context.serializedStringToArray(EVENT_PRIMARY_KEY, lastEventSentKeyStr)
: null;
context.lastEventKeySent = null;
final String maximumKeyStr = (String) offsets.get(TABLE_MAXIMUM_KEY);
context.maximumKey = (maximumKeyStr != null) ? context.serializedStringToArray(TABLE_MAXIMUM_KEY, maximumKeyStr)
: null;
final String dataCollectionsStr = (String) offsets.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY);
context.dataCollectionsToSnapshot.clear();
if (dataCollectionsStr != null) {
context.addDataCollectionNamesToSnapshot(context.stringToDataCollections(dataCollectionsStr));
}
return context;
}
public void sendEvent(Object[] key) {
lastEventKeySent = key;
}
public T currentDataCollectionId() {
return dataCollectionsToSnapshot.peek();
}
public int tablesToBeSnapshottedCount() {
return dataCollectionsToSnapshot.size();
}
public void nextChunkPosition(Object[] end) {
chunkEndPosition = end;
}
public Object[] chunkEndPosititon() {
return chunkEndPosition;
}
private void resetChunk() {
lastEventKeySent = null;
chunkEndPosition = null;
maximumKey = null;
}
public void revertChunk() {
chunkEndPosition = lastEventKeySent;
windowOpened = false;
}
public boolean isNonInitialChunk() {
return chunkEndPosition != null;
}
public T nextDataCollection() {
resetChunk();
return dataCollectionsToSnapshot.poll();
}
public void startNewChunk() {
currentChunkId = UUID.randomUUID().toString();
LOGGER.debug("Starting new chunk with id '{}'", currentChunkId);
}
public String currentChunkId() {
return currentChunkId;
}
public void maximumKey(Object[] key) {
maximumKey = key;
}
public Optional<Object[]> maximumKey() {
return Optional.ofNullable(maximumKey);
}
@Override
public String toString() {
return "IncrementalSnapshotContext [windowOpened=" + windowOpened + ", chunkEndPosition="
+ Arrays.toString(chunkEndPosition) + ", dataCollectionsToSnapshot=" + dataCollectionsToSnapshot
+ ", lastEventKeySent=" + Arrays.toString(lastEventKeySent) + ", maximumKey="
+ Arrays.toString(maximumKey) + "]";
}
}

View File

@ -28,7 +28,7 @@ public CloseIncrementalSnapshotWindow(EventDispatcher<? extends DataCollectionId
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public boolean arrived(Payload signalPayload) throws InterruptedException {
dispatcher.getIncrementalSnapshotChangeEventSource().closeWindow(signalPayload.id, (EventDispatcher) dispatcher, signalPayload.offsetContext);
dispatcher.getIncrementalSnapshotChangeEventSource().closeWindow(signalPayload.id, signalPayload.offsetContext);
return true;
}

View File

@ -7,7 +7,6 @@
import java.util.List;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
@ -20,12 +19,18 @@
*/
public interface IncrementalSnapshotChangeEventSource<T extends DataCollectionId> {
void closeWindow(String id, EventDispatcher<T> dispatcher, OffsetContext offsetContext) throws InterruptedException;
void closeWindow(String id, OffsetContext offsetContext) throws InterruptedException;
void processMessage(DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext);
void processMessage(DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException;
void init(OffsetContext offsetContext);
void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, OffsetContext offsetContext)
throws InterruptedException;
default void processHeartbeat(OffsetContext offsetContext) throws InterruptedException {
}
default void processFilteredEvent(OffsetContext offsetContext) throws InterruptedException {
}
}

View File

@ -5,243 +5,45 @@
*/
package io.debezium.pipeline.source.snapshot.incremental;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface IncrementalSnapshotContext<T> {
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.relational.TableId;
import io.debezium.util.HexConverter;
T currentDataCollectionId();
/**
* A class describing current state of incremental snapshot
*
* @author Jiri Pechanec
*
*/
@NotThreadSafe
public class IncrementalSnapshotContext<T> {
T nextDataCollection();
private static final Logger LOGGER = LoggerFactory.getLogger(IncrementalSnapshotContext.class);
List<T> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds);
// TODO Consider which (if any) information should be exposed in source info
public static final String INCREMENTAL_SNAPSHOT_KEY = "incremental_snapshot";
public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY = INCREMENTAL_SNAPSHOT_KEY + "_collections";
public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key";
public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key";
int tablesToBeSnapshottedCount();
/**
* @code(true) if window is opened and deduplication should be executed
*/
private boolean windowOpened = false;
boolean openWindow(String id);
/**
* The last primary key in chunk that is now in process.
*/
private Object[] chunkEndPosition;
boolean closeWindow(String id);
// TODO After extracting add into source info optional block
// incrementalSnapshotWindow{String from, String to}
// State to be stored and recovered from offsets
private final Queue<T> dataCollectionsToSnapshot = new LinkedList<>();
boolean isNonInitialChunk();
private final boolean useCatalogBeforeSchema;
/**
* The PK of the last record that was passed to Kafka Connect. In case of
* connector restart the start of the first chunk will be populated from it.
*/
private Object[] lastEventKeySent;
boolean snapshotRunning();
private String currentChunkId;
void startNewChunk();
/**
* The largest PK in the table at the start of snapshot.
*/
private Object[] maximumKey;
void nextChunkPosition(Object[] lastKey);
public IncrementalSnapshotContext() {
this(true);
}
String currentChunkId();
public IncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
}
Object[] chunkEndPosititon();
public boolean openWindow(String id) {
if (!id.startsWith(currentChunkId)) {
LOGGER.info("Arrived request to open window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
return false;
}
LOGGER.debug("Opening window for incremental snapshot chunk");
windowOpened = true;
return true;
}
void sendEvent(Object[] keyFromRow);
public boolean closeWindow(String id) {
if (!id.startsWith(currentChunkId)) {
LOGGER.info("Arrived request to close window with id = '{}', expected = '{}', request ignored", id, currentChunkId);
return false;
}
LOGGER.debug("Closing window for incremental snapshot chunk");
windowOpened = false;
return true;
}
void maximumKey(Object[] key);
public boolean deduplicationNeeded() {
return windowOpened;
}
Optional<Object[]> maximumKey();
private String arrayToSerializedString(Object[] array) {
try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(array);
return HexConverter.convertToHexString(bos.toByteArray());
}
catch (IOException e) {
throw new DebeziumException(String.format("Cannot serialize chunk information %s", array));
}
}
boolean deduplicationNeeded();
private Object[] serializedStringToArray(String field, String serialized) {
try (final ByteArrayInputStream bis = new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (Object[]) ois.readObject();
}
catch (Exception e) {
throw new DebeziumException(String.format("Failed to deserialize '%s' with value '%s'", field, serialized),
e);
}
}
Map<String, Object> store(Map<String, Object> offset);
private String dataCollectionsToSnapshotAsString() {
// TODO Handle non-standard table ids containing dots, commas etc.
return dataCollectionsToSnapshot.stream().map(x -> x.toString()).collect(Collectors.joining(","));
}
private List<String> stringToDataCollections(String dataCollectionsStr) {
return Arrays.asList(dataCollectionsStr.split(","));
}
protected boolean snapshotRunning() {
return !dataCollectionsToSnapshot.isEmpty();
}
public Map<String, Object> store(Map<String, Object> offset) {
if (!snapshotRunning()) {
return offset;
}
offset.put(EVENT_PRIMARY_KEY, arrayToSerializedString(lastEventKeySent));
offset.put(TABLE_MAXIMUM_KEY, arrayToSerializedString(maximumKey));
offset.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY, dataCollectionsToSnapshotAsString());
return offset;
}
private void addTablesIdsToSnapshot(List<T> dataCollectionIds) {
dataCollectionsToSnapshot.addAll(dataCollectionIds);
}
@SuppressWarnings("unchecked")
public List<T> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds) {
final List<T> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> (T) TableId.parse(x, useCatalogBeforeSchema))
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
return newDataCollectionIds;
}
public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets, boolean useCatalogBeforeSchema, Class<U> clazz) {
final IncrementalSnapshotContext<U> context = new IncrementalSnapshotContext<>(useCatalogBeforeSchema);
final String lastEventSentKeyStr = (String) offsets.get(EVENT_PRIMARY_KEY);
context.chunkEndPosition = (lastEventSentKeyStr != null)
? context.serializedStringToArray(EVENT_PRIMARY_KEY, lastEventSentKeyStr)
: null;
context.lastEventKeySent = null;
final String maximumKeyStr = (String) offsets.get(TABLE_MAXIMUM_KEY);
context.maximumKey = (maximumKeyStr != null) ? context.serializedStringToArray(TABLE_MAXIMUM_KEY, maximumKeyStr)
: null;
final String dataCollectionsStr = (String) offsets.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY);
context.dataCollectionsToSnapshot.clear();
if (dataCollectionsStr != null) {
context.addDataCollectionNamesToSnapshot(context.stringToDataCollections(dataCollectionsStr));
}
return context;
}
public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets, Class<U> clazz) {
return load(offsets, true, clazz);
}
public void sendEvent(Object[] key) {
lastEventKeySent = key;
}
public T currentDataCollectionId() {
return dataCollectionsToSnapshot.peek();
}
public int tablesToBeSnapshottedCount() {
return dataCollectionsToSnapshot.size();
}
public void nextChunkPosition(Object[] end) {
chunkEndPosition = end;
}
public Object[] chunkEndPosititon() {
return chunkEndPosition;
}
private void resetChunk() {
chunkEndPosition = null;
maximumKey = null;
}
public boolean isNonInitialChunk() {
return chunkEndPosition != null;
}
public T nextDataCollection() {
resetChunk();
return dataCollectionsToSnapshot.poll();
}
public void startNewChunk() {
currentChunkId = UUID.randomUUID().toString();
LOGGER.debug("Starting new chunk with id '{}'", currentChunkId);
}
public String currentChunkId() {
return currentChunkId;
}
public void maximumKey(Object[] key) {
maximumKey = key;
}
public Optional<Object[]> maximumKey() {
return Optional.ofNullable(maximumKey);
}
@Override
public String toString() {
return "IncrementalSnapshotContext [windowOpened=" + windowOpened + ", chunkEndPosition="
+ Arrays.toString(chunkEndPosition) + ", dataCollectionsToSnapshot=" + dataCollectionsToSnapshot
+ ", lastEventKeySent=" + Arrays.toString(lastEventKeySent) + ", maximumKey="
+ Arrays.toString(maximumKey) + "]";
}
void revertChunk();
}

View File

@ -5,109 +5,35 @@
*/
package io.debezium.pipeline.source.snapshot.incremental;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
@NotThreadSafe
public class SignalBasedIncrementalSnapshotChangeEventSource<T extends DataCollectionId> implements IncrementalSnapshotChangeEventSource<T> {
public class SignalBasedIncrementalSnapshotChangeEventSource<T extends DataCollectionId> extends AbstractIncrementalSnapshotChangeEventSource<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(SignalBasedIncrementalSnapshotChangeEventSource.class);
private Map<Struct, Object[]> window = new LinkedHashMap<>();
private CommonConnectorConfig connectorConfig;
private JdbcConnection jdbcConnection;
private final Clock clock;
private final String signalWindowStatement;
private final RelationalDatabaseSchema databaseSchema;
private final SnapshotProgressListener progressListener;
private final DataChangeEventListener dataListener;
private long totalRowsScanned = 0;
private Table currentTable;
private IncrementalSnapshotContext<T> context = null;
public SignalBasedIncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection,
DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener progressListener,
EventDispatcher<T> dispatcher, DatabaseSchema<?> databaseSchema, Clock clock,
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
this.connectorConfig = config;
this.jdbcConnection = jdbcConnection;
signalWindowStatement = "INSERT INTO " + connectorConfig.getSignalingDataCollectionId()
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener);
signalWindowStatement = "INSERT INTO " + config.getSignalingDataCollectionId()
+ " VALUES (?, ?, null)";
this.databaseSchema = (RelationalDatabaseSchema) databaseSchema;
this.clock = clock;
this.progressListener = progressListener;
this.dataListener = dataChangeEventListener;
}
@Override
@SuppressWarnings("unchecked")
public void closeWindow(String id, EventDispatcher<T> dispatcher, OffsetContext offsetContext) throws InterruptedException {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
if (!context.closeWindow(id)) {
return;
}
LOGGER.debug("Sending {} events from window buffer", window.size());
offsetContext.incrementalSnapshotEvents();
for (Object[] row : window.values()) {
sendEvent(dispatcher, offsetContext, row);
}
offsetContext.postSnapshotCompletion();
window.clear();
readChunk();
}
protected void sendEvent(EventDispatcher<T> dispatcher, OffsetContext offsetContext, Object[] row) throws InterruptedException {
context.sendEvent(keyFromRow(row));
offsetContext.event((T) context.currentDataCollectionId(), clock.currentTimeAsInstant());
dispatcher.dispatchSnapshotEvent((T) context.currentDataCollectionId(),
getChangeRecordEmitter(context.currentDataCollectionId(), offsetContext, row),
dispatcher.getIncrementalSnapshotChangeEventReceiver(dataListener));
}
/**
* Returns a {@link ChangeRecordEmitter} producing the change records for
* the given table row.
*/
protected ChangeRecordEmitter getChangeRecordEmitter(T dataCollectionId, OffsetContext offsetContext,
Object[] row) {
return new SnapshotChangeRecordEmitter(offsetContext, row, clock);
}
@Override
@ -115,277 +41,30 @@ protected ChangeRecordEmitter getChangeRecordEmitter(T dataCollectionId, OffsetC
public void processMessage(DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
if (context == null) {
LOGGER.warn("Context is null, skipping message processing");
return;
}
LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window);
if (!context.deduplicationNeeded() || window.isEmpty()) {
return;
}
if (!context.currentDataCollectionId().equals(dataCollectionId)) {
return;
}
if (key instanceof Struct) {
if (window.remove((Struct) key) != null) {
LOGGER.info("Removed '{}' from window", key);
}
if (!window.isEmpty() && context.deduplicationNeeded()) {
deduplicateWindow(dataCollectionId, key);
}
}
private void emitWindowOpen() throws SQLException {
@Override
protected void emitWindowOpen() throws SQLException {
jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
x.setString(1, context.currentChunkId() + "-open");
x.setString(2, OpenIncrementalSnapshotWindow.NAME);
});
jdbcConnection.commit();
}
private void emitWindowClose() throws SQLException {
@Override
protected void emitWindowClose() throws SQLException {
jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
x.setString(1, context.currentChunkId() + "-close");
x.setString(2, CloseIncrementalSnapshotWindow.NAME);
});
}
protected String buildChunkQuery(Table table) {
String condition = null;
// Add condition when this is not the first query
if (context.isNonInitialChunk()) {
final StringBuilder sql = new StringBuilder();
// Window boundaries
addKeyColumnsToCondition(table, sql, " >= ?");
sql.append(" AND NOT (");
addKeyColumnsToCondition(table, sql, " = ?");
sql.append(")");
// Table boundaries
sql.append(" AND ");
addKeyColumnsToCondition(table, sql, " <= ?");
condition = sql.toString();
}
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
.collect(Collectors.joining(", "));
return jdbcConnection.buildSelectWithRowLimits(table.id(),
connectorConfig.getIncrementalSnashotChunkSize(),
"*",
Optional.ofNullable(condition),
orderBy);
}
protected String buildMaxPrimaryKeyQuery(Table table) {
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
.collect(Collectors.joining(" DESC, ")) + " DESC";
return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, "*", Optional.empty(), orderBy.toString());
}
@Override
@SuppressWarnings("unchecked")
public void init(OffsetContext offsetContext) {
if (offsetContext == null) {
LOGGER.info("Empty incremental snapshot change event source started, no action needed");
return;
}
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
if (!context.snapshotRunning()) {
LOGGER.info("No incremental snapshot in progress, no action needed on start");
return;
}
LOGGER.info("Incremental snapshot in progress, need to read new chunk on start");
try {
progressListener.snapshotStarted();
readChunk();
}
catch (InterruptedException e) {
throw new DebeziumException("Reading of an initial chunk after connector restart has been interrupted");
}
LOGGER.info("Incremental snapshot in progress, loading of initial chunk completed");
}
private void readChunk() throws InterruptedException {
if (!context.snapshotRunning()) {
return;
}
try {
// This commit should be unnecessary and might be removed later
jdbcConnection.commit();
context.startNewChunk();
emitWindowOpen();
jdbcConnection.commit();
while (context.snapshotRunning()) {
final TableId currentTableId = (TableId) context.currentDataCollectionId();
currentTable = databaseSchema.tableFor(currentTableId);
if (currentTable == null) {
break;
}
if (currentTable.primaryKeyColumns().isEmpty()) {
LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId);
break;
}
if (!context.maximumKey().isPresent()) {
context.maximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> {
if (!rs.next()) {
return null;
}
return keyFromRow(jdbcConnection.rowToArray(currentTable, databaseSchema, rs,
ColumnUtils.toArray(rs, currentTable)));
}));
if (!context.maximumKey().isPresent()) {
LOGGER.info(
"No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty",
currentTableId);
context.nextDataCollection();
continue;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId,
context.maximumKey().orElse(new Object[0]));
}
}
createDataEventsForTable();
if (window.isEmpty()) {
LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
currentTableId);
tableScanCompleted();
context.nextDataCollection();
if (!context.snapshotRunning()) {
progressListener.snapshotCompleted();
}
}
else {
break;
}
}
emitWindowClose();
jdbcConnection.commit();
}
catch (SQLException e) {
throw new DebeziumException(String.format("Database error while executing incremental snapshot for table '%s'", context.currentDataCollectionId()), e);
}
}
@Override
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, OffsetContext offsetContext) throws InterruptedException {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
boolean shouldReadChunk = false;
if (!context.snapshotRunning()) {
shouldReadChunk = true;
}
final List<T> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(dataCollectionIds);
if (shouldReadChunk) {
progressListener.snapshotStarted();
progressListener.monitoredDataCollectionsDetermined(newDataCollectionIds);
readChunk();
}
}
protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) {
for (Iterator<Column> i = table.primaryKeyColumns().iterator(); i.hasNext();) {
final Column key = i.next();
sql.append(key.name()).append(predicate);
if (i.hasNext()) {
sql.append(" AND ");
}
}
}
/**
* Dispatches the data change events for the records of a single table.
*/
private void createDataEventsForTable() throws InterruptedException {
long exportStart = clock.currentTimeInMillis();
LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), context.tablesToBeSnapshottedCount());
final String selectStatement = buildChunkQuery(currentTable);
LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(),
selectStatement, context.chunkEndPosititon(), context.maximumKey().get());
final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id());
try (PreparedStatement statement = readTableChunkStatement(selectStatement);
ResultSet rs = statement.executeQuery()) {
final ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, currentTable);
long rows = 0;
Timer logTimer = getTableScanLogTimer();
Object[] lastRow = null;
Object[] firstRow = null;
while (rs.next()) {
rows++;
final Object[] row = jdbcConnection.rowToArray(currentTable, databaseSchema, rs, columnArray);
if (firstRow == null) {
firstRow = row;
}
final Struct keyStruct = tableSchema.keyFromColumnData(row);
window.put(keyStruct, row);
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
LOGGER.debug("\t Exported {} records for table '{}' after {}", rows, currentTable.id(),
Strings.duration(stop - exportStart));
logTimer = getTableScanLogTimer();
}
lastRow = row;
}
final Object[] firstKey = keyFromRow(firstRow);
final Object[] lastKey = keyFromRow(lastRow);
context.nextChunkPosition(lastKey);
progressListener.currentChunk(context.currentChunkId(), firstKey, lastKey);
if (lastRow != null) {
LOGGER.debug("\t Next window will resume from '{}'", context.chunkEndPosititon());
}
LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
currentTable.id(), Strings.duration(clock.currentTimeInMillis() - exportStart));
incrementTableRowsScanned(rows);
}
catch (SQLException e) {
throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e);
}
}
private void incrementTableRowsScanned(long rows) {
totalRowsScanned += rows;
progressListener.rowsScanned(currentTable.id(), totalRowsScanned);
}
private void tableScanCompleted() {
progressListener.dataCollectionSnapshotCompleted(currentTable.id(), totalRowsScanned);
totalRowsScanned = 0;
}
protected PreparedStatement readTableChunkStatement(String sql) throws SQLException {
final PreparedStatement statement = jdbcConnection.readTablePreparedStatement(connectorConfig, sql,
OptionalLong.empty());
if (context.isNonInitialChunk()) {
final Object[] maximumKey = context.maximumKey().get();
final Object[] chunkEndPosition = context.chunkEndPosititon();
for (int i = 0; i < chunkEndPosition.length; i++) {
statement.setObject(i + 1, chunkEndPosition[i]);
statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]);
statement.setObject(i + 1 + 2 * chunkEndPosition.length, maximumKey[i]);
}
}
return statement;
}
private Timer getTableScanLogTimer() {
return Threads.timer(clock, RelationalSnapshotChangeEventSource.LOG_INTERVAL);
}
private Object[] keyFromRow(Object[] row) {
if (row == null) {
return null;
}
final List<Column> keyColumns = currentTable.primaryKeyColumns();
final Object[] key = new Object[keyColumns.size()];
for (int i = 0; i < keyColumns.size(); i++) {
key[i] = row[keyColumns.get(i).position() - 1];
}
return key;
}
protected void setContext(IncrementalSnapshotContext<T> context) {
this.context = context;
jdbcConnection.commit();
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.source.snapshot.incremental;
import java.util.Map;
import io.debezium.annotation.NotThreadSafe;
/**
* A class describing current state of incremental snapshot
*
* @author Jiri Pechanec
*
*/
@NotThreadSafe
public class SignalBasedIncrementalSnapshotContext<T> extends AbstractIncrementalSnapshotContext<T> {
public SignalBasedIncrementalSnapshotContext() {
this(true);
}
public SignalBasedIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
super(useCatalogBeforeSchema);
}
public static <U> IncrementalSnapshotContext<U> load(Map<String, ?> offsets) {
return load(offsets, true);
}
public static <U> SignalBasedIncrementalSnapshotContext<U> load(Map<String, ?> offsets, boolean useCatalogBeforeSchema) {
final SignalBasedIncrementalSnapshotContext<U> context = new SignalBasedIncrementalSnapshotContext<>(useCatalogBeforeSchema);
init(context, offsets);
return context;
}
}

View File

@ -6,6 +6,7 @@
package io.debezium.junit;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
@ -22,7 +23,7 @@
*
*/
@Retention(RUNTIME)
@Target(METHOD)
@Target({ METHOD, TYPE })
public @interface ShouldFailWhen {
Class<? extends Supplier<Boolean>> value();
}

View File

@ -44,8 +44,8 @@ public String getConnectorName() {
@Test
public void testBuildQuery() {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null), null, null, SnapshotProgressListener.NO_OP, DataChangeEventListener.NO_OP);
final IncrementalSnapshotContext<TableId> context = new IncrementalSnapshotContext<>();
config(), new JdbcConnection(config().getConfig(), config -> null), null, null, null, SnapshotProgressListener.NO_OP, DataChangeEventListener.NO_OP);
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
source.setContext(context);
final Column pk1 = Column.editor().name("pk1").create();
final Column pk2 = Column.editor().name("pk2").create();
@ -63,7 +63,7 @@ public void testBuildQuery() {
@Test
public void testMaxQuery() {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null), null, null, SnapshotProgressListener.NO_OP, DataChangeEventListener.NO_OP);
config(), new JdbcConnection(config().getConfig(), config -> null), null, null, null, SnapshotProgressListener.NO_OP, DataChangeEventListener.NO_OP);
final Column pk1 = Column.editor().name("pk1").create();
final Column pk2 = Column.editor().name("pk2").create();
final Column val1 = Column.editor().name("val1").create();

View File

@ -32,8 +32,8 @@
public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector> extends AbstractConnectorTest {
private static final int ROW_COUNT = 1_000;
private static final int MAXIMUM_NO_RECORDS_CONSUMES = 2;
protected static final int ROW_COUNT = 1_000;
private static final int MAXIMUM_NO_RECORDS_CONSUMES = 3;
protected static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-is.txt")
.toAbsolutePath();
@ -115,10 +115,14 @@ protected String pkFieldName() {
protected void sendAdHocSnapshotSignal() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
connection.execute(
String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')",
signalTableName(), tableName()));
String query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')",
signalTableName(), tableName());
logger.info("Sending signal with query {}", query);
connection.execute(query);
}
catch (Exception e) {
logger.warn("Failed to send signal", e);
}
}
@ -127,7 +131,7 @@ protected void startConnector(Function<Configuration.Builder, Configuration.Buil
start(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
}
@ -220,7 +224,7 @@ public void updatesWithRestart() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
@ -286,7 +290,7 @@ public void snapshotOnlyWithRestart() throws Exception {
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();

View File

@ -408,6 +408,20 @@ a|Records the completed snapshot in the connector offsets.
include::{partialsdir}/modules/all-connectors/ref-connector-incremental-snapshot.adoc[leveloffset=+3]
===== Read-only Incremental snapshot
The MySql connector supports Incremental snapshot with read-only connection to the database.
It's achieved by using executed GTID set as high and low watermarks.
The state of chunk's window gets updated by comparing GTIDs of binlog events or server's heartbeats against low and high watermarks.
To switch to read-only implementation set {link-prefix}:#{link-mysql-connector}-property-read-only[read.only] to `true`.
.Prerequisites
* {link-prefix}:{link-mysql-connector}#enable-mysql-gtids[`enable-mysql-gtids`]
* If connector is reading from a replica, then for multithreaded replicas (replicas on which `replica_parallel_workers` is set to a value greater than 0)
it's required to set `replica_preserve_commit_order=1` or `slave_preserve_commit_order=1`
ifdef::community[]
// Type: continue
@ -2564,6 +2578,10 @@ endif::community[]
| Fully-qualified name of the data collection that is used to send {link-prefix}:{link-signalling}[signals] to the connector.
The name format is _database-name.table-name_.
|[[mysql-property-read-only]]<<mysql-property-read-only, `+read.only+`>>
|`false`
|Switch to alternative incremental snapshot watermarks implementation to avoid writes to signal data collection
|[[mysql-property-provide-transaction-metadata]]<<mysql-property-provide-transaction-metadata, `provide.transaction.metadata`>>
|`false`
|Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify `true` if you want the connector to do this. See {link-prefix}:{link-mysql-connector}#mysql-transaction-metadata[Transaction metadata] for details.