Merge pull request #152 from rhauch/dbz-166

DBZ-166 Corrected shutdown logic of MySQL connector
This commit is contained in:
Randall Hauch 2016-12-20 08:48:03 -06:00 committed by GitHub
commit d1d21166b9
18 changed files with 1139 additions and 472 deletions

View File

@ -29,9 +29,10 @@
*
* @author Randall Hauch
*/
public abstract class AbstractReader {
public abstract class AbstractReader implements Reader {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final String name;
protected final MySqlTaskContext context;
private final BlockingQueue<SourceRecord> records;
private final AtomicBoolean running = new AtomicBoolean(false);
@ -40,25 +41,34 @@ public abstract class AbstractReader {
private ConnectException failureException;
private final int maxBatchSize;
private final Metronome metronome;
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
/**
* Create a snapshot reader.
*
* @param name the name of the reader
* @param context the task context in which this reader is running; may not be null
*/
public AbstractReader(MySqlTaskContext context) {
public AbstractReader(String name, MySqlTaskContext context) {
this.name = name;
this.context = context;
this.records = new LinkedBlockingDeque<>(context.maxQueueSize());
this.maxBatchSize = context.maxBatchSize();
this.metronome = Metronome.parker(context.pollIntervalInMillseconds(), TimeUnit.MILLISECONDS, Clock.SYSTEM);
}
/**
* Start the reader and return immediately. Once started, the {@link SourceRecord} can be obtained by periodically calling
* {@link #poll()} until that method returns {@code null}.
* <p>
* This method does nothing if it is already running.
*/
@Override
public String name() {
return name;
}
@Override
public void uponCompletion(Runnable handler) {
assert this.uponCompletion.get() == null;
this.uponCompletion.set(handler);
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.failure.set(null);
@ -67,16 +77,18 @@ public void start() {
}
}
/**
* Stop the reader from running. This method is called when the connector is stopped.
*/
@Override
public void stop() {
try {
if (this.running.compareAndSet(true, false)) {
doStop();
}
doStop();
running.set(false);
} finally {
doShutdown();
if (failure.get() != null) {
// We had a failure and it was propagated via poll(), after which Kafka Connect will stop
// the connector, which will stop the task that will then stop this reader via this method.
// Since no more records will ever be polled again, we know we can clean up this reader's resources...
doCleanup();
}
}
}
@ -89,35 +101,29 @@ public void stop() {
* The reader has been requested to stop, so perform any work required to stop the reader's resources that were previously
* {@link #start() started}.
* <p>
* This method is called only if the reader is not already stopped.
* @see #doShutdown()
* This method is always called when {@link #stop()} is called, and the first time {@link #isRunning()} will return
* {@code true} the first time and {@code false} for any subsequent calls.
*/
protected abstract void doStop();
/**
* The reader has completed sending all {@link #enqueueRecord(SourceRecord) enqueued records}, so clean up any resources
* that remain.
* The reader has completed all processing and all {@link #enqueueRecord(SourceRecord) enqueued records} have been
* {@link #poll() consumed}, so this reader should clean up any resources that might remain.
*/
protected abstract void doCleanup();
/**
* The reader has been stopped.
* <p>
* This method is always called when the connector is stopped.
* @see #doStop()
*/
protected abstract void doShutdown();
/**
* Call this method only when the reader has successfully completed all of its work, signaling that subsequent
* calls to {@link #poll()} should forever return {@code null}.
* calls to {@link #poll()} should forever return {@code null} and that this reader should transition from
* {@link Reader.State#STOPPING} to {@link Reader.State#STOPPED}.
*/
protected void completeSuccessfully() {
this.success.set(true);
}
/**
* Call this method only when the reader has failed, and that a subsequent call to {@link #poll()} should throw this error.
* Call this method only when the reader has failed, that a subsequent call to {@link #poll()} should throw
* this error, and that {@link #doCleanup()} can be called at any time.
*
* @param error the error that resulted in the failure; should not be {@code null}
*/
@ -126,7 +132,8 @@ protected void failed(Throwable error) {
}
/**
* Call this method only when the reader has failed, and that a subsequent call to {@link #poll()} should throw this error.
* Call this method only when the reader has failed, that a subsequent call to {@link #poll()} should throw
* this error, and that {@link #doCleanup()} can be called at any time.
*
* @param error the error that resulted in the failure; should not be {@code null}
* @param msg the error message; may not be null
@ -157,26 +164,34 @@ protected ConnectException wrap(Throwable error) {
return new ConnectException(msg, error);
}
/**
* Get whether the snapshot is still running and records are available.
*
* @return {@code true} if still running, or {@code false} if no longer running and/or all records have been processed
*/
public boolean isRunning() {
return this.running.get();
@Override
public State state() {
if (success.get() || failure.get() != null) {
// We've either completed successfully or have failed, but either way no more records will be returned ...
return State.STOPPED;
}
if (running.get()) {
return State.RUNNING;
}
// Otherwise, we're in the process of stopping ...
return State.STOPPING;
}
/**
* Poll for the next batch of source records. This method blocks if this reader is still running but no records are available.
*
* @return the list of source records; or {@code null} when the snapshot is complete, all records have previously been
* returned, and the completion function (supplied in the constructor) has been called
* @throws InterruptedException if this thread is interrupted while waiting for more records
* @throws ConnectException if there is an error while this reader is running
*/
protected boolean isRunning() {
return running.get();
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Before we do anything else, determine if there was a failure and throw that exception ...
failureException = this.failure.get();
if (failureException != null) throw failureException;
if (failureException != null) {
// In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine
// will then explicitly stop the connector task. Most likely, however, the reader that threw
// the exception will have already stopped itself and will generate no additional records.
// Regardless, there may be records on the queue that will never be consumed.
throw failureException;
}
logger.trace("Polling for next batch of records");
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
@ -192,13 +207,30 @@ public List<SourceRecord> poll() throws InterruptedException {
if (batch.isEmpty() && success.get() && records.isEmpty()) {
// We found no records but the operation completed successfully, so we're done
this.running.set(false);
doCleanup();
cleanupResources();
return null;
}
pollComplete(batch);
logger.trace("Completed batch of {} records", batch.size());
return batch;
}
/**
* This method is normally called by {@link #poll()} when there this reader finishes normally and all generated
* records are consumed prior to being {@link #stop() stopped}. However, if this reader is explicitly
* {@link #stop() stopped} while still working, then subclasses should call this method when they have completed
* all of their shutdown work.
*/
protected void cleanupResources() {
try {
doCleanup();
} finally {
Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
if (completionHandler != null) {
completionHandler.run();
}
}
}
/**
* Method called when {@link #poll()} completes sending a non-zero-sized batch of records.

View File

@ -79,10 +79,11 @@ public class BinlogReader extends AbstractReader {
/**
* Create a binlog reader.
*
* @param name the name of this reader; may not be null
* @param context the task context in which this reader is running; may not be null
*/
public BinlogReader(MySqlTaskContext context) {
super(context);
public BinlogReader(String name, MySqlTaskContext context) {
super(name, context);
source = context.source();
recordMakers = context.makeRecord();
recordSchemaChangesInSourceRecords = context.includeSchemaChangeRecords();
@ -217,20 +218,23 @@ protected void doStart() {
@Override
protected void doStop() {
try {
logger.debug("Stopping binlog reader, last recorded offset: {}", lastOffset);
client.disconnect();
if (isRunning()) {
logger.debug("Stopping binlog reader, last recorded offset: {}", lastOffset);
client.disconnect();
}
cleanupResources();
} catch (IOException e) {
logger.error("Unexpected error when disconnecting from the MySQL binary log reader", e);
} finally {
// We unregister our JMX metrics now, which means we won't record metrics for records that
// may be processed between now and complete shutdown. That's okay.
metrics.unregister(logger);
}
}
@Override
protected void doShutdown() {
metrics.unregister(logger);
}
@Override
protected void doCleanup() {
logger.debug("Completed writing all records that were read from the binlog before being stopped");
}
@Override
@ -398,8 +402,9 @@ protected void handleGtidEvent(Event event) {
* MySQL schemas.
*
* @param event the database change data event to be processed; may not be null
* @throws InterruptedException if this thread is interrupted while recording the DDL statements
*/
protected void handleQueryEvent(Event event) {
protected void handleQueryEvent(Event event) throws InterruptedException {
QueryEventData command = unwrapData(event);
logger.debug("Received query command: {}", event);
String sql = command.getSql().trim();

View File

@ -0,0 +1,191 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
/**
* A {@link Reader} implementation that runs one or more other {@link Reader}s in a consistently, completely, and sequentially.
* This reader ensures that all records generated by one of its contained {@link Reader}s are all passed through to callers
* via {@link #poll() polling} before the next reader is started. And, when this reader is {@link #stop() stopped}, this
* class ensures that current reader is stopped and that no additional readers will be started.
*
* @author Randall Hauch
*/
@ThreadSafe
public final class ChainedReader implements Reader {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final List<Reader> readers = new ArrayList<>();
private final LinkedList<Reader> remainingReaders = new LinkedList<>();
private final AtomicBoolean running = new AtomicBoolean();
private final AtomicBoolean completed = new AtomicBoolean(true);
private final AtomicReference<Reader> currentReader = new AtomicReference<>();
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final AtomicReference<String> completionMessage = new AtomicReference<>();
/**
* Create a new chained reader.
*/
public ChainedReader() {
}
/**
* Set the message that should be logged when all of the readers have completed their work.
*
* @param msg the message; may be null
*/
public void uponCompletion(String msg) {
completionMessage.set(msg);
}
@Override
public void uponCompletion(Runnable handler) {
uponCompletion.set(handler);
}
protected ChainedReader add(Reader reader) {
assert reader != null;
reader.uponCompletion(this::readerCompletedPolling);
readers.add(reader);
return this;
}
@Override
public synchronized void start() {
if (running.compareAndSet(false, true)) {
completed.set(false);
// Build up the list of readers that need to be called ...
remainingReaders.clear();
readers.forEach(remainingReaders::add);
// Start the first reader, if there is one ...
if (!startNextReader()) {
// We couldn't start it ...
running.set(false);
completed.set(true);
}
}
}
@Override
public synchronized void stop() {
if (running.compareAndSet(true, false)) {
// First, remove all readers that have not yet been started, which ensures the next one is is not started
// while we're trying to stop the previous one ...
remainingReaders.clear();
// Then stop the currently-running reader but do not remove it as it will be removed when it completes ...
Reader current = currentReader.get();
if (current != null) {
try {
logger.info("Stopping the {} reader", current.name());
current.stop();
} catch (Throwable t) {
logger.error("Unexpected error stopping the {} reader", current.name(), t);
}
}
}
}
@Override
public State state() {
if (running.get()) {
return State.RUNNING;
}
return completed.get() ? State.STOPPED : State.STOPPING;
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// We have to be prepared for the current reader to be in the middle of stopping and this chain transitioning
// to the next reader. In this case, the reader that is stopping will return null or empty, and we can ignore
// those until the next reader has started ...
while (running.get() || !completed.get()) {
Reader reader = currentReader.get();
if (reader != null) {
List<SourceRecord> records = reader.poll();
if (records != null && !records.isEmpty()) return records;
// otherwise, we'll go ahead until the next reader is ready or until we're no longer running ...
}
}
return null;
}
/**
* Called when the previously-started reader has returned all of its records via {@link #poll() polling}.
* Only when this method is called is the now-completed reader removed as the current reader, and this is what
* guarantees that all records produced by the now-completed reader have been polled.
*/
private synchronized void readerCompletedPolling() {
if (!startNextReader()) {
// We've finished with the last reader ...
try {
if (running.get() || !completed.get()) {
// Notify the handler ...
Runnable handler = uponCompletion.get();
if (handler != null) {
handler.run();
}
// and output our message ...
String msg = completionMessage.get();
if (msg != null) {
logger.info(msg);
}
}
} finally {
// And since this is the last reader, make sure this chain is also stopped ...
completed.set(true);
running.set(false);
}
}
}
/**
* Start the next reader.
*
* @return {@code true} if the next reader was started, or {@code false} if there are no more readers
*/
private boolean startNextReader() {
Reader reader = remainingReaders.isEmpty() ? null : remainingReaders.pop();
if (reader == null) {
// There are no readers, so nothing to do ...
Reader lastReader = currentReader.getAndSet(null);
if (lastReader != null) {
// Make sure it has indeed stopped ...
lastReader.stop();
}
return false;
}
// There is at least one more reader, so start it ...
Reader lastReader = currentReader.getAndSet(null);
if (lastReader != null) {
logger.debug("Transitioning from the {} reader to the {} reader", lastReader.name(), reader.name());
} else {
logger.debug("Starting the {} reader", reader.name());
}
reader.start();
currentReader.set(reader);
return true;
}
@Override
public String name() {
Reader reader = currentReader.get();
return reader != null ? reader.name() : "chained";
}
}

View File

@ -7,10 +7,8 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
@ -33,11 +31,8 @@
public final class MySqlConnectorTask extends SourceTask {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicBoolean runningReader = new AtomicBoolean(false);
private volatile MySqlTaskContext taskContext;
private volatile SnapshotReader snapshotReader;
private volatile BinlogReader binlogReader;
private volatile AbstractReader currentReader;
private volatile ChainedReader readers;
/**
* Create an instance of the log reader that uses Kafka to store database schema history and the
@ -145,37 +140,39 @@ public synchronized void start(Map<String, String> props) {
// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();
// Set up the readers ...
this.binlogReader = new BinlogReader(taskContext);
// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
readers = new ChainedReader();
readers.uponCompletion(this::completeReaders);
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ...
this.snapshotReader = new SnapshotReader(taskContext);
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
readers.add(snapshotReader);
if (taskContext.isInitialSnapshotOnly()) {
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
this.snapshotReader.onSuccessfulCompletion(this::skipReadBinlog);
readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
} else {
this.snapshotReader.onSuccessfulCompletion(this::transitionToReadBinlog);
if (!rowBinlogEnabled) {
throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
}
readers.add(binlogReader);
}
this.snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
if (snapshotEventsAreInserts) this.snapshotReader.generateInsertEvents();
this.currentReader = this.snapshotReader;
} else {
if (!rowBinlogEnabled) {
throw new ConnectException(
"The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
}
// Just starting to read the binlog ...
this.currentReader = this.binlogReader;
// We're going to start by reading the binlog ...
readers.add(binlogReader);
}
// And start our first reader ...
this.runningReader.set(true);
this.currentReader.start();
// And start the chain of readers ...
this.readers.start();
} catch (Throwable e) {
// If we don't complete startup, then Kafka Connect will not attempt to stop the connector. So if we
// run into a problem, we have to stop ourselves ...
@ -185,8 +182,12 @@ public synchronized void start(Map<String, String> props) {
// Log, but don't propagate ...
logger.error("Failed to start the connector (see other exception), but got this error while cleaning up", s);
}
if (e instanceof InterruptedException) {
Thread.interrupted();
throw new ConnectException("Interrupted while starting the connector", e);
}
if (e instanceof ConnectException) {
throw e;
throw (ConnectException) e;
}
throw new ConnectException(e);
} finally {
@ -196,11 +197,14 @@ public synchronized void start(Map<String, String> props) {
@Override
public List<SourceRecord> poll() throws InterruptedException {
Reader currentReader = readers;
if (currentReader == null) {
return null;
}
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
logger.trace("Polling for events");
AbstractReader reader = currentReader;
return reader != null ? reader.poll() : Collections.emptyList();
return currentReader.poll();
} finally {
prevLoggingContext.restore();
}
@ -210,66 +214,33 @@ public List<SourceRecord> poll() throws InterruptedException {
public synchronized void stop() {
if (context != null) {
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
// We need to explicitly stop both readers, in this order. If we were to instead call 'currentReader.stop()', there
// is a chance without synchronization that we'd miss the transition and stop only the snapshot reader. And stopping
// both is far simpler and more efficient than synchronizing ...
try {
logger.info("Stopping MySQL connector task");
if (this.snapshotReader != null) {
try {
this.snapshotReader.stop();
} catch (Throwable e) {
logger.error("Unexpected error stopping the snapshot reader", e);
} finally {
this.snapshotReader = null;
}
}
// Stop the readers ...
readers.stop();
} finally {
try {
if (this.binlogReader != null) {
try {
this.binlogReader.stop();
} catch (Throwable e) {
logger.error("Unexpected error stopping the binary log reader", e);
} finally {
this.binlogReader = null;
}
}
} finally {
this.currentReader = null;
try {
// Capture that our reader is no longer running; used in "transitionToReadBinlog()" ...
this.runningReader.set(false);
// Flush and stop database history, close all JDBC connections ...
if (this.taskContext != null) taskContext.shutdown();
} catch (Throwable e) {
logger.error("Unexpected error shutting down the database history and/or closing JDBC connections", e);
} finally {
context = null;
logger.info("Connector task successfully stopped");
prevLoggingContext.restore();
}
}
prevLoggingContext.restore();
}
}
}
/**
* Transition from the snapshot reader to the binlog reader. This method is synchronized (along with {@link #start(Map)}
* and {@link #stop()}) to ensure that we don't transition while we've already begun to stop.
* When the task is {@link #stop() stopped}, the readers may have additional work to perform before they actually
* stop and before all their records have been consumed via the {@link #poll()} method. This method signals that
* all of this has completed.
*/
protected synchronized void transitionToReadBinlog() {
if (this.binlogReader == null || !this.runningReader.get()) {
// We are no longer running, so don't start the binlog reader ...
return;
protected void completeReaders() {
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
// Flush and stop database history, close all JDBC connections ...
if (this.taskContext != null) taskContext.shutdown();
} catch (Throwable e) {
logger.error("Unexpected error shutting down the database history and/or closing JDBC connections", e);
} finally {
context = null;
logger.info("Connector task finished all work and is now shutdown");
prevLoggingContext.restore();
}
logger.debug("Transitioning from snapshot reader to binlog reader");
this.binlogReader.start();
this.currentReader = this.binlogReader;
}
protected void skipReadBinlog() {
logger.info("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
}
/**

View File

@ -199,7 +199,7 @@ public void setSystemVariables(Map<String, String> variables) {
ddlParser.systemVariables().setVariable(Scope.SESSION, varName, value);
});
}
/**
* Get the system variables as known by the DDL parser.
*
@ -208,7 +208,7 @@ public void setSystemVariables(Map<String, String> variables) {
public MySqlSystemVariables systemVariables() {
return ddlParser.systemVariables();
}
/**
* Load the schema for the databases using JDBC database metadata. If there are changes relative to any
* table definitions that existed when this method is called, those changes are recorded in the database history
@ -269,7 +269,11 @@ protected void changeTablesAndRecordInHistory(SourceInfo source, Callable<Void>
});
// Finally record the DDL statements into the history ...
dbHistory.record(source.partition(), source.offset(), "", tables(), ddl.toString());
try {
dbHistory.record(source.partition(), source.offset(), "", tables(), ddl.toString());
} catch (Throwable e) {
throw new ConnectException("Error recording the DDL statement in the database history " + dbHistory + ": " + ddl, e);
}
}
protected void appendDropTableStatement(StringBuilder sb, TableId tableId) {
@ -361,7 +365,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
// Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the
// schema change records so that failure recovery (which is based on of the history) won't lose
// schema change records.
dbHistory.record(source.partition(), source.offset(), databaseName, tables, ddlStatements);
try {
dbHistory.record(source.partition(), source.offset(), databaseName, tables, ddlStatements);
} catch (Throwable e) {
throw new ConnectException(
"Error recording the DDL statement(s) in the database history " + dbHistory + ": " + ddlStatements, e);
}
}
// Figure out what changed ...

View File

@ -0,0 +1,102 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.util.List;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
/**
* A component that reads a portion of the MySQL server history or current state.
* <p>
* A reader starts out in the {@link State#STOPPED stopped} state, and when {@link #start() started} transitions to a
* {@link State#RUNNING} state. The reader may either complete its work or be explicitly {@link #stop() stopped}, at which
* point the reader transitions to a {@value State#STOPPING stopping} state until all already-generated {@link SourceRecord}s
* are consumed by the client via the {@link #poll() poll} method. Only after all records are consumed does the reader
* transition to the {@link State#STOPPED stopped} state and call the {@link #uponCompletion(Runnable)} method.
* <p>
* See {@link ChainedReader} if multiple {@link Reader} implementations are to be run in-sequence while keeping the
* correct start, stop, and completion semantics.
*
* @author Randall Hauch
* @see ChainedReader
*/
public interface Reader {
/**
* The possible states of a reader.
*/
public static enum State {
/**
* The reader is stopped and static.
*/
STOPPED,
/**
* The reader is running and generated records.
*/
RUNNING,
/**
* The reader has completed its work or been explicitly stopped, but not all of the generated records have been
* consumed via {@link Reader#poll() polling}.
*/
STOPPING;
}
/**
* Get the name of this reader.
*
* @return the reader's name; never null
*/
public String name();
/**
* Get the current state of this reader.
*
* @return the state; never null
*/
public State state();
/**
* Set the function that should be called when this reader transitions from the {@link State#STOPPING} to
* {@link State#STOPPED} state, which is after all generated records have been consumed via the {@link #poll() poll}
* method.
* <p>
* This method should only be called while the reader is in the {@link State#STOPPED} state.
*
* @param handler the function; may not be null
*/
public void uponCompletion(Runnable handler);
/**
* Start the reader and return immediately. Once started, the {@link SourceRecord}s generated by the reader can be obtained by
* periodically calling {@link #poll()} until that method returns {@code null}.
* <p>
* This method does nothing if it is already running.
*/
public void start();
/**
* Stop the reader from running and transition to the {@link State#STOPPING} state until all remaining records
* are {@link #poll() consumed}, at which point its state transitions to {@link State#STOPPED}.
*/
public void stop();
/**
* Poll for the next batch of source records. This method returns {@code null} only when all records generated by
* this reader have been processed, following the natural or explicit {@link #stop() stopping} of this reader.
* Note that this method may block if no additional records are available but the reader may produce more, thus
* callers should call this method continually until this method returns {@code null}.
*
* @return the list of source records that may or may not be empty; or {@code null} when there will be no more records
* because the reader has completely {@link State#STOPPED}.
* @throws InterruptedException if this thread is interrupted while waiting for more records
* @throws ConnectException if there is an error while this reader is running
*/
public List<SourceRecord> poll() throws InterruptedException;
}

View File

@ -20,7 +20,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
@ -44,34 +43,22 @@ public class SnapshotReader extends AbstractReader {
private final boolean includeData;
private RecordRecorder recorder;
private volatile Thread thread;
private volatile Runnable onSuccessfulCompletion;
private final SnapshotReaderMetrics metrics;
/**
* Create a snapshot reader.
*
* @param name the name of this reader; may not be null
* @param context the task context in which this reader is running; may not be null
*/
public SnapshotReader(MySqlTaskContext context) {
super(context);
public SnapshotReader(String name, MySqlTaskContext context) {
super(name, context);
this.includeData = !context.isSchemaOnlySnapshot();
recorder = this::recordRowAsRead;
metrics = new SnapshotReaderMetrics(context.clock());
metrics.register(context, logger);
}
/**
* Set the non-blocking function that should be called upon successful completion of the snapshot, which is after the
* snapshot generates its final record <em>and</em> all such records have been {@link #poll() polled}.
*
* @param onSuccessfulCompletion the function; may be null
* @return this object for method chaining; never null
*/
public SnapshotReader onSuccessfulCompletion(Runnable onSuccessfulCompletion) {
this.onSuccessfulCompletion = onSuccessfulCompletion;
return this;
}
/**
* Set whether this reader's {@link #execute() execution} should block other transactions as minimally as possible by
* releasing the read lock as early as possible. Although the snapshot process should obtain a consistent snapshot even
@ -116,32 +103,22 @@ public SnapshotReader generateInsertEvents() {
@Override
protected void doStart() {
thread = new Thread(this::execute, "mysql-snapshot-" + context.serverName());
// TODO: Use MDC logging
thread.start();
}
@Override
protected void doStop() {
thread.interrupt();
}
@Override
protected void doShutdown() {
metrics.unregister(logger);
logger.debug("Stopping snapshot reader");
// The parent class will change the isRunning() state, and this class' execute() uses that and will stop automatically
}
@Override
protected void doCleanup() {
this.thread = null;
logger.trace("Completed writing all snapshot records");
try {
// Call the completion function to say that we've successfully completed
if (onSuccessfulCompletion != null) onSuccessfulCompletion.run();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new ConnectException("Error calling completion function after completing snapshot", e);
this.thread = null;
logger.debug("Completed writing all snapshot records");
} finally {
metrics.unregister(logger);
}
}
@ -160,6 +137,8 @@ protected void execute() {
logger.info("Starting snapshot for {} with user '{}'", context.connectionString(), mysql.username());
logRolesForCurrentUser(mysql);
logServerInformation(mysql);
boolean isLocked = false;
boolean isTxnStarted = false;
try {
metrics.startSnapshot();
@ -176,6 +155,7 @@ protected void execute() {
// See: https://dev.mysql.com/doc/refman/5.7/en/set-transaction.html
// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-transaction-isolation-levels.html
// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-consistent-read.html
if (!isRunning()) return;
logger.info("Step 0: disabling autocommit and enabling repeatable read transactions");
mysql.setAutoCommit(false);
sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
@ -185,293 +165,345 @@ protected void execute() {
// Generate the DDL statements that set the charset-related system variables ...
Map<String, String> systemVariables = context.readMySqlCharsetSystemVariables(sql);
String setSystemVariablesStatement = context.setStatementFor(systemVariables);
AtomicBoolean interrupted = new AtomicBoolean(false);
long lockAcquired = 0L;
// ------
// STEP 1
// ------
// First, start a transaction and request that a consistent MVCC snapshot is obtained immediately.
// See http://dev.mysql.com/doc/refman/5.7/en/commit.html
logger.info("Step 1: start transaction with consistent snapshot");
sql.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
mysql.execute(sql.get());
try {
// ------
// STEP 1
// ------
// First, start a transaction and request that a consistent MVCC snapshot is obtained immediately.
// See http://dev.mysql.com/doc/refman/5.7/en/commit.html
if (!isRunning()) return;
logger.info("Step 1: start transaction with consistent snapshot");
sql.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
mysql.execute(sql.get());
isTxnStarted = true;
// ------
// STEP 2
// ------
// Obtain read lock on all tables. This statement closes all open tables and locks all tables
// for all databases with a global read lock, and it prevents ALL updates while we have this lock.
// It also ensures that everything we do while we have this lock will be consistent.
long lockAcquired = clock.currentTimeInMillis();
logger.info("Step 2: flush and obtain global read lock (preventing writes to database)");
sql.set("FLUSH TABLES WITH READ LOCK");
mysql.execute(sql.get());
// ------
// STEP 2
// ------
// Obtain read lock on all tables. This statement closes all open tables and locks all tables
// for all databases with a global read lock, and it prevents ALL updates while we have this lock.
// It also ensures that everything we do while we have this lock will be consistent.
if (!isRunning()) return;
lockAcquired = clock.currentTimeInMillis();
logger.info("Step 2: flush and obtain global read lock (preventing writes to database)");
sql.set("FLUSH TABLES WITH READ LOCK");
mysql.execute(sql.get());
isLocked = true;
// ------
// STEP 3
// ------
// Obtain the binlog position and update the SourceInfo in the context. This means that all source records generated
// as part of the snapshot will contain the binlog position of the snapshot.
logger.info("Step 3: read binlog position of MySQL master");
String showMasterStmt = "SHOW MASTER STATUS";
sql.set(showMasterStmt);
mysql.query(sql.get(), rs -> {
if (rs.next()) {
String binlogFilename = rs.getString(1);
long binlogPosition = rs.getLong(2);
source.setBinlogStartPoint(binlogFilename, binlogPosition);
if (rs.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...
String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
source.setCompletedGtidSet(gtidSet);
logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
gtidSet);
} else {
logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
}
source.startSnapshot();
} else {
throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt
+ "'. Make sure your server is correctly configured");
}
});
// From this point forward, all source records produced by this connector will have an offset that includes a
// "snapshot" field (with value of "true").
// ------
// STEP 4
// ------
// Get the list of databases ...
logger.info("Step 4: read list of available databases");
final List<String> databaseNames = new ArrayList<>();
sql.set("SHOW DATABASES");
mysql.query(sql.get(), rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
}
});
logger.info("\t list of available databases is: {}", databaseNames);
// ------
// STEP 5
// ------
// Get the list of table IDs for each database. We can't use a prepared statement with MySQL, so we have to
// build the SQL statement each time. Although in other cases this might lead to SQL injection, in our case
// we are reading the database names from the database and not taking them from the user ...
logger.info("Step 5: read list of available tables in each database");
List<TableId> tableIds = new ArrayList<>();
final Map<String, List<TableId>> tableIdsByDbName = new HashMap<>();
for (String dbName : databaseNames) {
sql.set("SHOW TABLES IN " + dbName);
// ------
// STEP 3
// ------
// Obtain the binlog position and update the SourceInfo in the context. This means that all source records
// generated
// as part of the snapshot will contain the binlog position of the snapshot.
if (!isRunning()) return;
logger.info("Step 3: read binlog position of MySQL master");
String showMasterStmt = "SHOW MASTER STATUS";
sql.set(showMasterStmt);
mysql.query(sql.get(), rs -> {
while (rs.next()) {
TableId id = new TableId(dbName, null, rs.getString(1));
if (filters.tableFilter().test(id)) {
tableIds.add(id);
tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
logger.info("\t including '{}'", id);
if (rs.next()) {
String binlogFilename = rs.getString(1);
long binlogPosition = rs.getLong(2);
source.setBinlogStartPoint(binlogFilename, binlogPosition);
if (rs.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...
String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
source.setCompletedGtidSet(gtidSet);
logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
gtidSet);
} else {
logger.info("\t '{}' is filtered out, discarding", id);
logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
}
source.startSnapshot();
} else {
throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt
+ "'. Make sure your server is correctly configured");
}
});
}
// ------
// STEP 6
// ------
// Transform the current schema so that it reflects the *current* state of the MySQL server's contents.
// First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ...
logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:");
schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);
// From this point forward, all source records produced by this connector will have an offset that includes a
// "snapshot" field (with value of "true").
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
allTableIds.addAll(tableIds);
allTableIds.forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId,
this::enqueueSchemaChanges));
// Add a DROP DATABASE statement for each database that we no longer know about ...
schema.tables().tableIds().stream().map(TableId::catalog)
.filter(Predicates.not(databaseNames::contains))
.forEach(missingDbName -> schema.applyDdl(source, missingDbName, "DROP DATABASE IF EXISTS " + missingDbName,
this::enqueueSchemaChanges));
// Now process all of our tables for each database ...
for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
String dbName = entry.getKey();
// First drop, create, and then use the named database ...
schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + dbName, this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "CREATE DATABASE " + dbName, this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "USE " + dbName, this::enqueueSchemaChanges);
for (TableId tableId : entry.getValue()) {
sql.set("SHOW CREATE TABLE " + tableId);
// ------
// STEP 4
// ------
// Get the list of databases ...
if (!isRunning()) return;
logger.info("Step 4: read list of available databases");
final List<String> databaseNames = new ArrayList<>();
sql.set("SHOW DATABASES");
mysql.query(sql.get(), rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
}
});
logger.info("\t list of available databases is: {}", databaseNames);
// ------
// STEP 5
// ------
// Get the list of table IDs for each database. We can't use a prepared statement with MySQL, so we have to
// build the SQL statement each time. Although in other cases this might lead to SQL injection, in our case
// we are reading the database names from the database and not taking them from the user ...
if (!isRunning()) return;
logger.info("Step 5: read list of available tables in each database");
List<TableId> tableIds = new ArrayList<>();
final Map<String, List<TableId>> tableIdsByDbName = new HashMap<>();
for (String dbName : databaseNames) {
sql.set("SHOW TABLES IN " + dbName);
mysql.query(sql.get(), rs -> {
if (rs.next()) {
schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
while (rs.next() && isRunning()) {
TableId id = new TableId(dbName, null, rs.getString(1));
if (filters.tableFilter().test(id)) {
tableIds.add(id);
tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
logger.info("\t including '{}'", id);
} else {
logger.info("\t '{}' is filtered out, discarding", id);
}
}
});
}
}
context.makeRecord().regenerate();
// ------
// STEP 7
// ------
boolean unlocked = false;
if (minimalBlocking) {
// We are doing minimal blocking, then we should release the read lock now. All subsequent SELECT
// should still use the MVCC snapshot obtained when we started our transaction (since we started it
// "...with consistent snapshot"). So, since we're only doing very simple SELECT without WHERE predicates,
// we can release the lock now ...
logger.info("Step 7: releasing global read lock to enable MySQL writes");
sql.set("UNLOCK TABLES");
mysql.execute(sql.get());
unlocked = true;
long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
logger.info("Step 7: blocked writes to MySQL for a total of {}", Strings.duration(lockReleased - lockAcquired));
}
// ------
// STEP 6
// ------
// Transform the current schema so that it reflects the *current* state of the MySQL server's contents.
// First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ...
logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:");
schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);
AtomicBoolean interrupted = new AtomicBoolean(false);
// ------
// STEP 8
// ------
// Use a buffered blocking consumer to buffer all of the records, so that after we copy all of the tables
// and produce events we can update the very last event with the non-snapshot offset ...
if (includeData) {
BufferedBlockingConsumer<SourceRecord> bufferedRecordQueue = BufferedBlockingConsumer.bufferLast(super::enqueueRecord);
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
allTableIds.addAll(tableIds);
allTableIds.stream()
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
.forEach(tableId -> schema.applyDdl(source, tableId.schema(),
"DROP TABLE IF EXISTS " + tableId,
this::enqueueSchemaChanges));
// Dump all of the tables and generate source records ...
logger.info("Step 8: scanning contents of {} tables", tableIds.size());
metrics.setTableCount(tableIds.size());
// Add a DROP DATABASE statement for each database that we no longer know about ...
schema.tables().tableIds().stream().map(TableId::catalog)
.filter(Predicates.not(databaseNames::contains))
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
.forEach(missingDbName -> schema.applyDdl(source, missingDbName,
"DROP DATABASE IF EXISTS " + missingDbName,
this::enqueueSchemaChanges));
long startScan = clock.currentTimeInMillis();
AtomicLong totalRowCount = new AtomicLong();
int counter = 0;
int completedCounter = 0;
long largeTableCount = context.rowCountForLargeTable();
Iterator<TableId> tableIdIter = tableIds.iterator();
while (tableIdIter.hasNext()) {
TableId tableId = tableIdIter.next();
// Obtain a record maker for this table, which knows about the schema ...
RecordsForTable recordMaker = context.makeRecord().forTable(tableId, null, bufferedRecordQueue);
if (recordMaker != null) {
// Choose how we create statements based on the # of rows ...
sql.set("SELECT COUNT(*) FROM " + tableId);
AtomicLong numRows = new AtomicLong();
// Now process all of our tables for each database ...
for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
if (!isRunning()) break;
String dbName = entry.getKey();
// First drop, create, and then use the named database ...
schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + dbName, this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "CREATE DATABASE " + dbName, this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "USE " + dbName, this::enqueueSchemaChanges);
for (TableId tableId : entry.getValue()) {
if (!isRunning()) break;
sql.set("SHOW CREATE TABLE " + tableId);
mysql.query(sql.get(), rs -> {
if (rs.next()) numRows.set(rs.getLong(1));
});
StatementFactory statementFactory = this::createStatement;
if (numRows.get() > largeTableCount) {
statementFactory = this::createStatementWithLargeResultSet;
}
// Scan the rows in the table ...
long start = clock.currentTimeInMillis();
logger.info("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size());
sql.set("SELECT * FROM " + tableId);
mysql.query(sql.get(), statementFactory, rs -> {
long rowNum = 0;
long rowCount = numRows.get();
try {
// The table is included in the connector's filters, so process all of the table records ...
final Table table = schema.tableFor(tableId);
final int numColumns = table.columns().size();
final Object[] row = new Object[numColumns];
while (rs.next()) {
for (int i = 0, j = 1; i != numColumns; ++i, ++j) {
row[i] = rs.getObject(j);
}
recorder.recordRow(recordMaker, row, ts); // has no row number!
++rowNum;
if (rowNum % 10_000 == 0 || rowNum == rowCount) {
long stop = clock.currentTimeInMillis();
logger.info("Step 8: - {} of {} rows scanned from table '{}' after {}", rowNum, rowCount, tableId,
Strings.duration(stop - start));
}
}
} catch (InterruptedException e) {
Thread.interrupted();
// We were not able to finish all rows in all tables ...
logger.info("Step 8: Stopping the snapshot due to thread interruption");
interrupted.set(true);
} finally {
totalRowCount.addAndGet(rowCount);
if (rs.next()) {
schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
}
});
metrics.completeTable();
if (interrupted.get()) break;
}
++completedCounter;
}
context.makeRecord().regenerate();
// ------
// STEP 7
// ------
if (minimalBlocking && isLocked) {
// We are doing minimal blocking, then we should release the read lock now. All subsequent SELECT
// should still use the MVCC snapshot obtained when we started our transaction (since we started it
// "...with consistent snapshot"). So, since we're only doing very simple SELECT without WHERE predicates,
// we can release the lock now ...
logger.info("Step 7: releasing global read lock to enable MySQL writes");
sql.set("UNLOCK TABLES");
mysql.execute(sql.get());
isLocked = false;
long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
logger.info("Step 7: blocked writes to MySQL for a total of {}", Strings.duration(lockReleased - lockAcquired));
}
// We've copied all of the tables, but our buffer holds onto the very last record.
// First mark the snapshot as complete and then apply the updated offset to the buffered record ...
source.markLastSnapshot();
long stop = clock.currentTimeInMillis();
// ------
// STEP 8
// ------
// Use a buffered blocking consumer to buffer all of the records, so that after we copy all of the tables
// and produce events we can update the very last event with the non-snapshot offset ...
if (!isRunning()) return;
if (includeData) {
BufferedBlockingConsumer<SourceRecord> bufferedRecordQueue = BufferedBlockingConsumer.bufferLast(super::enqueueRecord);
// Dump all of the tables and generate source records ...
logger.info("Step 8: scanning contents of {} tables", tableIds.size());
metrics.setTableCount(tableIds.size());
long startScan = clock.currentTimeInMillis();
AtomicLong totalRowCount = new AtomicLong();
int counter = 0;
int completedCounter = 0;
long largeTableCount = context.rowCountForLargeTable();
Iterator<TableId> tableIdIter = tableIds.iterator();
while (tableIdIter.hasNext()) {
TableId tableId = tableIdIter.next();
if (!isRunning()) break;
// Obtain a record maker for this table, which knows about the schema ...
RecordsForTable recordMaker = context.makeRecord().forTable(tableId, null, bufferedRecordQueue);
if (recordMaker != null) {
// Choose how we create statements based on the # of rows ...
sql.set("SELECT COUNT(*) FROM " + tableId);
AtomicLong numRows = new AtomicLong();
mysql.query(sql.get(), rs -> {
if (rs.next()) numRows.set(rs.getLong(1));
});
StatementFactory statementFactory = this::createStatement;
if (numRows.get() > largeTableCount) {
statementFactory = this::createStatementWithLargeResultSet;
}
// Scan the rows in the table ...
long start = clock.currentTimeInMillis();
logger.info("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size());
sql.set("SELECT * FROM " + tableId);
try {
mysql.query(sql.get(), statementFactory, rs -> {
long rowNum = 0;
long rowCount = numRows.get();
try {
// The table is included in the connector's filters, so process all of the table records
// ...
final Table table = schema.tableFor(tableId);
final int numColumns = table.columns().size();
final Object[] row = new Object[numColumns];
while (rs.next()) {
for (int i = 0, j = 1; i != numColumns; ++i, ++j) {
row[i] = rs.getObject(j);
}
recorder.recordRow(recordMaker, row, ts); // has no row number!
++rowNum;
if (rowNum % 100 == 0) {
if (!isRunning()) break;
}
if (rowNum % 10_000 == 0 || rowNum == rowCount) {
long stop = clock.currentTimeInMillis();
logger.info("Step 8: - {} of {} rows scanned from table '{}' after {}", rowNum, rowCount,
tableId,
Strings.duration(stop - start));
}
}
} catch (InterruptedException e) {
Thread.interrupted();
// We were not able to finish all rows in all tables ...
logger.info("Step 8: Stopping the snapshot due to thread interruption");
interrupted.set(true);
} finally {
totalRowCount.addAndGet(rowCount);
}
});
} finally {
metrics.completeTable();
if (interrupted.get()) break;
}
}
++completedCounter;
}
// See if we've been stopped or interrupted ...
if (!isRunning() || interrupted.get()) return;
// We've copied all of the tables and we've not yet been stopped, but our buffer holds onto the
// very last record. First mark the snapshot as complete and then apply the updated offset to
// the buffered record ...
source.markLastSnapshot();
long stop = clock.currentTimeInMillis();
try {
bufferedRecordQueue.flush(this::replaceOffset);
logger.info("Step 8: scanned {} rows in {} tables in {}",
totalRowCount, tableIds.size(), Strings.duration(stop - startScan));
} catch (InterruptedException e) {
Thread.interrupted();
// We were not able to finish all rows in all tables ...
logger.info("Step 8: aborting the snapshot after {} rows in {} of {} tables {}",
totalRowCount, completedCounter, tableIds.size(), Strings.duration(stop - startScan));
interrupted.set(true);
}
} else {
// source.markLastSnapshot(); Think we will not be needing this here it is used to mark last row entry?
logger.info("Step 8: encountered only schema based snapshot, skipping data snapshot");
}
} finally {
// No matter what, we always want to do these steps if necessary ...
// ------
// STEP 9
// ------
// Release the read lock if we have not yet done so. Locks are not released when committing/rolling back ...
int step = 9;
if (isLocked) {
logger.info("Step {}: releasing global read lock to enable MySQL writes", step++);
sql.set("UNLOCK TABLES");
mysql.execute(sql.get());
isLocked = false;
long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
}
// -------
// STEP 10
// -------
// Either commit or roll back the transaction ...
if (isTxnStarted) {
if (interrupted.get() || !isRunning()) {
// We were interrupted or were stopped while reading the tables,
// so roll back the transaction and return immediately ...
logger.info("Step {}: rolling back transaction after abort", step++);
sql.set("ROLLBACK");
mysql.execute(sql.get());
metrics.abortSnapshot();
return;
}
// Otherwise, commit our transaction
logger.info("Step {}: committing transaction", step++);
sql.set("COMMIT");
mysql.execute(sql.get());
metrics.completeSnapshot();
} else {}
}
if (!isRunning()) {
// The reader (and connector) was stopped and we did not finish ...
try {
bufferedRecordQueue.flush(this::replaceOffset);
logger.info("Step 8: scanned {} rows in {} tables in {}",
totalRowCount, tableIds.size(), Strings.duration(stop - startScan));
} catch (InterruptedException e) {
Thread.interrupted();
// We were not able to finish all rows in all tables ...
logger.info("Step 8: aborting the snapshot after {} rows in {} of {} tables {}",
totalRowCount, completedCounter, tableIds.size(), Strings.duration(stop - startScan));
interrupted.set(true);
// Mark this reader as having completing its work ...
completeSuccessfully();
long stop = clock.currentTimeInMillis();
logger.info("Stopped snapshot after {} but before completing", Strings.duration(stop - ts));
} finally {
// and since there's no more work to do clean up all resources ...
cleanupResources();
}
} else {
// source.markLastSnapshot(); Think we will not be needing this here it is used to mark last row entry?
logger.info("Step 8: encountered only schema based snapshot, skipping data snapshot");
}
// ------
// STEP 9
// ------
// Release the read lock if we have not yet done so ...
int step = 9;
if (!unlocked) {
logger.info("Step {}: releasing global read lock to enable MySQL writes", step++);
sql.set("UNLOCK TABLES");
mysql.execute(sql.get());
unlocked = true;
long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
}
// -------
// STEP 10
// -------
if (interrupted.get()) {
// We were interrupted while reading the tables, so roll back the transaction and return immediately ...
logger.info("Step {}: rolling back transaction after abort", step++);
sql.set("ROLLBACK");
mysql.execute(sql.get());
metrics.abortSnapshot();
return;
}
// Otherwise, commit our transaction
logger.info("Step {}: committing transaction", step++);
sql.set("COMMIT");
mysql.execute(sql.get());
metrics.completeSnapshot();
try {
// Mark the source as having completed the snapshot. This will ensure the `source` field on records
// are not denoted as a snapshot ...
source.completeSnapshot();
} finally {
// Set the completion flag ...
super.completeSuccessfully();
long stop = clock.currentTimeInMillis();
logger.info("Completed snapshot in {}", Strings.duration(stop - ts));
// We completed the snapshot...
try {
// Mark the source as having completed the snapshot. This will ensure the `source` field on records
// are not denoted as a snapshot ...
source.completeSnapshot();
} finally {
// Set the completion flag ...
completeSuccessfully();
long stop = clock.currentTimeInMillis();
logger.info("Completed snapshot in {}", Strings.duration(stop - ts));
}
}
} catch (Throwable e) {
failed(e, "Aborting snapshot after running '" + sql.get() + "': " + e.getMessage());
failed(e, "Aborting snapshot due to error when last running '" + sql.get() + "': " + e.getMessage());
}
}

View File

@ -117,7 +117,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
reader = new BinlogReader(context);
reader = new BinlogReader("binlog", context);
// Start reading the binlog ...
reader.start();
@ -177,7 +177,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
reader = new BinlogReader(context);
reader = new BinlogReader("binlog", context);
// Start reading the binlog ...
reader.start();

View File

@ -0,0 +1,223 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.connector.mysql.Reader.State;
import io.debezium.util.Collect;
/**
* @author Randall Hauch
*
*/
public class ChainedReaderTest {
private static final List<SourceRecord> RL1 = Collect.arrayListOf(record());
private static final List<SourceRecord> RL2 = Collect.arrayListOf(record());
private static final List<SourceRecord> RL3 = Collect.arrayListOf(record());
private static final List<SourceRecord> RL4 = Collect.arrayListOf(record());
private static final List<SourceRecord> RL5 = Collect.arrayListOf(record());
@SuppressWarnings("unchecked")
private static final List<List<SourceRecord>> SOURCE_RECORDS = Collect.arrayListOf(RL1, RL2, RL3, RL4, RL5);
protected static Supplier<List<SourceRecord>> records() {
Iterator<List<SourceRecord>> iter = SOURCE_RECORDS.iterator();
return () -> {
return iter.hasNext() ? iter.next() : null;
};
}
private static SourceRecord record() {
return new SourceRecord(null, null, null, null, null, null);
}
private ChainedReader reader;
@Before
public void beforeEach() {
reader = new ChainedReader();
}
@Test
public void shouldNotStartWithoutReaders() throws InterruptedException {
assertThat(reader.state()).isEqualTo(State.STOPPED);
reader.start();
assertThat(reader.state()).isEqualTo(State.STOPPED);
assertPollReturnsNoMoreRecords();
}
@Test
public void shouldStartAndStopSingleReaderBeforeReaderStopsItself() throws InterruptedException {
reader.add(new MockReader("r1", records()));
reader.uponCompletion("Stopped the r1 reader");
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
assertThat(reader.poll()).isSameAs(RL2);
assertThat(reader.poll()).isSameAs(RL3);
assertThat(reader.poll()).isSameAs(RL4);
reader.stop();
assertThat(reader.state()).isEqualTo(State.STOPPING);
assertThat(reader.poll()).isNull();
assertThat(reader.state()).isEqualTo(State.STOPPED);
assertPollReturnsNoMoreRecords();
}
@Test
public void shouldStartSingleReaderThatStopsAutomatically() throws InterruptedException {
reader.add(new MockReader("r2", records()));
reader.uponCompletion("Stopped the r2 reader");
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
assertThat(reader.poll()).isSameAs(RL2);
assertThat(reader.poll()).isSameAs(RL3);
assertThat(reader.poll()).isSameAs(RL4);
assertThat(reader.poll()).isSameAs(RL5);
assertThat(reader.poll()).isNull(); // cause the mock reader to stop itself
assertThat(reader.state()).isEqualTo(State.STOPPED);
assertPollReturnsNoMoreRecords();
}
@Test
public void shouldStartAndStopMultipleReaders() throws InterruptedException {
reader.add(new MockReader("r3", records()));
reader.add(new MockReader("r4", records()));
reader.uponCompletion("Stopped the r3+r4 reader");
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
assertThat(reader.poll()).isSameAs(RL2);
assertThat(reader.poll()).isSameAs(RL3);
assertThat(reader.poll()).isSameAs(RL4);
assertThat(reader.poll()).isSameAs(RL5);
assertThat(reader.poll()).isSameAs(RL1);
assertThat(reader.poll()).isSameAs(RL2);
assertThat(reader.poll()).isSameAs(RL3);
assertThat(reader.poll()).isSameAs(RL4);
assertThat(reader.poll()).isSameAs(RL5);
assertThat(reader.poll()).isNull(); // cause the 2nd mock reader to stop itself
assertThat(reader.state()).isEqualTo(State.STOPPED);
assertPollReturnsNoMoreRecords();
}
@Test
public void shouldStartAndStopReaderThatContinuesProducingItsRecordsAfterBeingStopped() throws InterruptedException {
reader.add(new CompletingMockReader("r5", records()));
reader.uponCompletion("Stopped the r5 reader");
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
assertThat(reader.poll()).isSameAs(RL2);
// Manually stop this reader, and it will continue returning all of its 5 record lists ...
reader.stop();
assertThat(reader.state()).isEqualTo(State.STOPPING);
// Read the remaining records ...
assertThat(reader.poll()).isSameAs(RL3);
assertThat(reader.poll()).isSameAs(RL4);
assertThat(reader.poll()).isSameAs(RL5);
assertThat(reader.poll()).isNull();
// The reader has no more records, so it should now be stopped ...
assertThat(reader.state()).isEqualTo(State.STOPPED);
assertPollReturnsNoMoreRecords();
}
protected void assertPollReturnsNoMoreRecords() throws InterruptedException {
for (int i=0;i!=10; ++i) {
assertThat(reader.poll()).isNull();
}
}
/**
* A {@link Reader} that returns records until manually stopped.
*/
public static class MockReader implements Reader {
private final String name;
private final Supplier<List<SourceRecord>> pollResultsSupplier;
private final AtomicReference<Runnable> completionHandler = new AtomicReference<>();
private final AtomicBoolean running = new AtomicBoolean();
private final AtomicBoolean completed = new AtomicBoolean();
public MockReader(String name, Supplier<List<SourceRecord>> pollResultsSupplier) {
this.name = name;
this.pollResultsSupplier = pollResultsSupplier;
}
@Override
public State state() {
if (running.get()) return State.RUNNING;
if (completed.get()) return State.STOPPED;
return State.STOPPING;
}
@Override
public String name() {
return name;
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> record = null;
if (continueReturningRecordsFromPolling()) {
record = pollResultsSupplier.get();
}
if (record == null) {
// We're done ...
Runnable handler = this.completionHandler.get();
if (handler != null) {
handler.run();
}
completed.set(true);
running.set(false);
}
return record;
}
protected boolean continueReturningRecordsFromPolling() {
return running.get();
}
@Override
public void start() {
assertThat(running.get()).isFalse();
running.set(true);
}
@Override
public void stop() {
running.set(false);
}
@Override
public void uponCompletion(Runnable handler) {
completionHandler.set(handler);
}
}
/**
* A {@link MockReader} that always returns all records even after this reader is manually stopped.
*/
public static class CompletingMockReader extends MockReader {
public CompletingMockReader(String name, Supplier<List<SourceRecord>> pollResultsSupplier) {
super(name,pollResultsSupplier);
}
@Override
protected boolean continueReturningRecordsFromPolling() {
return true;
}
}
}

View File

@ -56,7 +56,7 @@ public void afterEach() {
}
@Test
public void shouldApplyDdlStatementsAndRecover() {
public void shouldApplyDdlStatementsAndRecover() throws InterruptedException {
mysql = build.storeDatabaseHistoryInFile(TEST_FILE_PATH).serverName(SERVER_NAME).createSchemas();
mysql.start();
@ -76,7 +76,7 @@ public void shouldApplyDdlStatementsAndRecover() {
}
@Test
public void shouldLoadSystemAndNonSystemTablesAndConsumeOnlyFilteredDatabases() {
public void shouldLoadSystemAndNonSystemTablesAndConsumeOnlyFilteredDatabases() throws InterruptedException {
mysql = build.storeDatabaseHistoryInFile(TEST_FILE_PATH)
.serverName(SERVER_NAME)
.includeDatabases("connector_test")
@ -102,7 +102,7 @@ public void shouldLoadSystemAndNonSystemTablesAndConsumeOnlyFilteredDatabases()
}
@Test
public void shouldLoadSystemAndNonSystemTablesAndConsumeAllDatabases() {
public void shouldLoadSystemAndNonSystemTablesAndConsumeAllDatabases() throws InterruptedException {
mysql = build.storeDatabaseHistoryInFile(TEST_FILE_PATH)
.serverName(SERVER_NAME)
.includeDatabases("connector_test")

View File

@ -93,8 +93,8 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
config = simpleConfig().build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader(context);
reader.onSuccessfulCompletion(completed::countDown);
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
reader.useMinimalBlocking(true);
@ -171,8 +171,8 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
config = simpleConfig().with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_(.*)").build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader(context);
reader.onSuccessfulCompletion(completed::countDown);
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateReadEvents();
reader.useMinimalBlocking(true);
@ -251,8 +251,8 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader(context);
reader.onSuccessfulCompletion(completed::countDown);
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
reader.useMinimalBlocking(true);
@ -331,8 +331,8 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader(context);
reader.onSuccessfulCompletion(completed::countDown);
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
reader.useMinimalBlocking(true);
@ -340,12 +340,12 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
reader.start();
// Poll for records ...
//Testing.Print.enable();
// Testing.Print.enable();
List<SourceRecord> records = null;
KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(LOGICAL_NAME + ".");
SchemaChangeHistory schemaChanges = new SchemaChangeHistory(LOGICAL_NAME);
while ( (records = reader.poll()) != null ) {
records.forEach(record->{
while ((records = reader.poll()) != null) {
records.forEach(record -> {
VerifyRecord.isValid(record);
store.add(record);
schemaChanges.add(record);
@ -361,7 +361,7 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
assertThat(store.collectionCount()).isEqualTo(0);
// Make sure the snapshot completed ...
if ( completed.await(10, TimeUnit.SECONDS) ) {
if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ...
Testing.print("completed the snapshot");
} else {

View File

@ -27,29 +27,30 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory {
protected AbstractDatabaseHistory() {
}
@Override
public void configure(Configuration config, HistoryRecordComparator comparator) {
this.config = config;
this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE;
}
@Override
public void start() {
// do nothing
}
@Override
public final void record(Map<String, ?> source, Map<String, ?> position, String databaseName, Tables schema, String ddl) {
storeRecord(new HistoryRecord(source, position, databaseName, ddl));
public final void record(Map<String, ?> source, Map<String, ?> position, String databaseName, Tables schema, String ddl)
throws DatabaseHistoryException {
storeRecord(new HistoryRecord(source, position, databaseName, ddl));
}
@Override
public final void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
logger.debug("Recovering DDL history for source partition {} and offset {}",source,position);
logger.debug("Recovering DDL history for source partition {} and offset {}", source, position);
HistoryRecord stopPoint = new HistoryRecord(source, position, null, null);
recoverRecords(schema,ddlParser,recovered->{
if (comparator.isAtOrBefore(recovered,stopPoint)) {
recoverRecords(schema, ddlParser, recovered -> {
if (comparator.isAtOrBefore(recovered, stopPoint)) {
String ddl = recovered.ddl();
if (ddl != null) {
ddlParser.setCurrentSchema(recovered.databaseName()); // may be null
@ -62,10 +63,10 @@ public final void recover(Map<String, ?> source, Map<String, ?> position, Tables
});
}
protected abstract void storeRecord(HistoryRecord record);
protected abstract void storeRecord(HistoryRecord record) throws DatabaseHistoryException;
protected abstract void recoverRecords(Tables schema, DdlParser ddlParser, Consumer<HistoryRecord> records);
@Override
public void stop() {
// do nothing

View File

@ -60,8 +60,9 @@ public interface DatabaseHistory {
* @param databaseName the name of the database whose schema is being changed; may be null
* @param schema the current definition of the database schema; may not be null
* @param ddl the DDL statements that describe the changes to the database schema; may not be null
* @throws DatabaseHistoryException if the record could not be written
*/
void record(Map<String, ?> source, Map<String, ?> position, String databaseName, Tables schema, String ddl);
void record(Map<String, ?> source, Map<String, ?> position, String databaseName, Tables schema, String ddl) throws DatabaseHistoryException;
/**
* Recover the {@link Tables database schema} to a known point in its history. Note that it is possible to recover the

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;
/**
* @author Randall Hauch
*
*/
public class DatabaseHistoryException extends RuntimeException {
private static final long serialVersionUID = 1L;
public DatabaseHistoryException(String message) {
super(message);
}
public DatabaseHistoryException(Throwable cause) {
super(cause);
}
public DatabaseHistoryException(String message, Throwable cause) {
super(message, cause);
}
public DatabaseHistoryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -5,6 +5,7 @@
*/
package io.debezium.relational.history;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@ -14,6 +15,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
@ -46,48 +48,90 @@ public final class FileDatabaseHistory extends AbstractDatabaseHistory {
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean();
private Path path;
@Override
public void configure(Configuration config, HistoryRecordComparator comparator) {
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
throw new ConnectException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
config.validateAndRecord(ALL_FIELDS, logger::error);
if (running.get()) {
throw new IllegalStateException("Database history file already initialized to " + path);
}
super.configure(config, comparator);
path = Paths.get(config.getString(FILE_PATH));
}
@Override
public void start() {
lock.write(() -> {
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
throw new ConnectException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
if (running.compareAndSet(false, true)) {
Path path = this.path;
if (path == null) {
throw new IllegalStateException("FileDatabaseHistory must be configured before it is started");
}
try {
// Make sure the file exists ...
if (!Files.exists(path)) {
Files.createDirectories(path.getParent());
try {
Files.createFile(path);
} catch (FileAlreadyExistsException e) {
// do nothing
}
}
} catch (IOException e) {
throw new DatabaseHistoryException("Unable to create history file at " + path + ": " + e.getMessage(), e);
}
}
config.validateAndRecord(ALL_FIELDS, logger::error);
super.configure(config,comparator);
path = Paths.get(config.getString(FILE_PATH));
});
}
@Override
protected void storeRecord(HistoryRecord record) {
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
if (record == null) return;
lock.write(() -> {
if (!running.get()) {
throw new IllegalStateException("The history has been stopped and will not accept more records");
}
try {
String line = writer.write(record.document());
if (!Files.exists(path)) {
Files.createDirectories(path.getParent());
// Create a buffered writer to write all of the records, closing the file when there is an error or when
// the thread is no longer supposed to run
try (BufferedWriter historyWriter = Files.newBufferedWriter(path, StandardOpenOption.APPEND)) {
try {
Files.createFile(path);
} catch (FileAlreadyExistsException e) {
// do nothing
historyWriter.append(line);
historyWriter.newLine();
} catch (IOException e) {
logger.error("Failed to add record to history at {}: {}", path, record, e);
return;
}
} catch (IOException e) {
throw new DatabaseHistoryException("Unable to create writer for history file " + path + ": " + e.getMessage(), e);
}
Files.write(path, Collect.arrayListOf(line), UTF8, StandardOpenOption.APPEND);
} catch (IOException e) {
logger.error("Failed to add record to history at {}: {}", path, record, e);
logger.error("Failed to convert record to string: {}", record, e);
}
});
}
@Override
protected void recoverRecords(Tables schema, DdlParser ddlParser, Consumer<HistoryRecord> records) {
public void stop() {
running.set(false);
}
@Override
protected synchronized void recoverRecords(Tables schema, DdlParser ddlParser, Consumer<HistoryRecord> records) {
lock.write(() -> {
try {
if (Files.exists(path)) {
for (String line : Files.readAllLines(path)) {
records.accept(new HistoryRecord(reader.read(line)));
for (String line : Files.readAllLines(path, UTF8)) {
if (line != null && !line.isEmpty()) {
records.accept(new HistoryRecord(reader.read(line)));
}
}
}
} catch (IOException e) {

View File

@ -150,7 +150,7 @@ public void start() {
}
@Override
protected void storeRecord(HistoryRecord record) {
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records.");
}
@ -165,10 +165,12 @@ protected void storeRecord(HistoryRecord record) {
logger.debug("Stored record in topic '{}' partition {} at offset {} ",
metadata.topic(), metadata.partition(), metadata.offset());
}
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for response to storing record into database history: {}", record);
} catch( InterruptedException e) {
logger.trace("Interrupted before record was written into database history: {}", record);
Thread.interrupted();
throw new DatabaseHistoryException(e);
} catch (ExecutionException e) {
logger.error("Error while storing database history record into Kafka: {}", record, e);
throw new DatabaseHistoryException(e);
}
}

View File

@ -5,8 +5,11 @@
*/
package io.debezium.relational.history;
import static org.junit.Assert.fail;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -50,6 +53,13 @@ public void beforeEach() {
source2 = server("xyz");
history = createHistory();
}
@After
public void afterEach() {
if (history != null) {
history.stop();
}
}
protected abstract DatabaseHistory createHistory();
@ -62,7 +72,11 @@ protected Map<String, Object> position(String filename, long position, int entry
}
protected void record(long pos, int entry, String ddl, Tables... update) {
history.record(source1, position("a.log", pos, entry), "db", tables, ddl);
try {
history.record(source1, position("a.log", pos, entry), "db", tables, ddl);
} catch (Throwable t) {
fail(t.getMessage());
}
for (Tables tables : update) {
if (tables != null) {
parser.setCurrentSchema("db");

View File

@ -8,6 +8,7 @@
import java.nio.file.Path;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.util.Testing;
@ -32,6 +33,13 @@ protected DatabaseHistory createHistory() {
history.configure(Configuration.create()
.with(FileDatabaseHistory.FILE_PATH, TEST_FILE_PATH.toAbsolutePath().toString())
.build(),null);
history.start();
return history;
}
@Override
@Test
public void shouldRecordChangesAndRecoverToVariousPoints() {
super.shouldRecordChangesAndRecoverToVariousPoints();
}
}