DBZ-144 Corrected MySQL connector restart

Added tests to verify whether the connector is properly restarting in the binlog when previously the connector failed or stopped in the middle of a transaction. The tests showed that the connector is not able to properly start when using or not using GTIDs, since restarting from an arbitrary binlog event causes problems since the TABLE_MAP events for the affected tables are skipped.

The logic was changed significantly to record in the offsets the binlog coordinates at the start of the transaction, which should work whether or not GTIDs are used. Upon restart, the connector may have to re-read the events that were previously processed, but now the offset also includes the number of events that were previously processed so that these can be skipped upon restart.

This has an unforunate side effect since the offsets capture a transaction was completed only when it generates a source record for the subsequent transaction. This is because the connector generates source records (with their offsets) for the binlog events in the transaction before the transaction's commit is seen. And, since no additional source records are produced for the transaction commit, the recorded offsets will show that the prior transaction is complete and that all of the events in the subsequent transaction are to be skipped. Thus, upon restart the connector has to re-read (but ignore) all of the binlog events associated with the completed transaction. This shouldn’t be a problem, and will only slow restarts for very large transactions.
This commit is contained in:
Randall Hauch 2016-11-03 16:44:40 -05:00
parent 0d2acfd0a6
commit ea5f7983c7
15 changed files with 1022 additions and 297 deletions

View File

@ -70,6 +70,8 @@ public class BinlogReader extends AbstractReader {
private final ElapsedTimeStrategy pollOutputDelay;
private long recordCounter = 0L;
private long previousOutputMillis = 0L;
private long initialEventsToSkip = 0L;
private boolean skipEvent = false;
private final AtomicLong totalRecordCounter = new AtomicLong();
private volatile Map<String, ?> lastOffset = null;
private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
@ -164,15 +166,21 @@ protected void doStart() {
logger.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
String filteredGtidSetStr = filteredGtidSet.toString();
client.setGtidSet(filteredGtidSetStr);
source.setGtidSet(filteredGtidSetStr);
source.setCompletedGtidSet(filteredGtidSetStr);
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
} else {
client.setBinlogFilename(source.binlogFilename());
client.setBinlogPosition(source.nextBinlogPosition());
client.setBinlogPosition(source.binlogPosition());
}
// We may be restarting in the middle of a transaction, so see how far into the transaction we have already processed...
initialEventsToSkip = source.eventsToSkipUponRestart();
// Set the starting row number, which is the next row number to be read ...
startingRowNumber = source.nextEventRowNumber();
startingRowNumber = source.rowsToSkipUponRestart();
// Only when we reach the first BEGIN event will we start to skip events ...
skipEvent = false;
// Initial our poll output delay logic ...
pollOutputDelay.hasElapsed();
@ -287,8 +295,15 @@ protected void handleEvent(Event event) {
// Forward the event to the handler ...
eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event);
// And after that event has been processed, always set the starting row number to 0 ...
startingRowNumber = 0;
// Capture that we've completed another event ...
source.completeEvent();
if (skipEvent) {
// We're in the mode of skipping events and we just skipped this one, so decrement our skip count ...
--initialEventsToSkip;
skipEvent = initialEventsToSkip > 0;
}
} catch (RuntimeException e) {
// There was an error in the event handler, so propagate the failure to Kafka Connect ...
failed(e, "Error processing binlog event");
@ -375,8 +390,7 @@ protected void handleGtidEvent(Event event) {
GtidEventData gtidEvent = unwrapData(event);
String gtid = gtidEvent.getGtid();
gtidSet.add(gtid);
source.setGtid(gtid);
source.setGtidSet(gtidSet.toString()); // rather than use the client's GTID set
source.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set
}
/**
@ -387,14 +401,23 @@ protected void handleGtidEvent(Event event) {
*/
protected void handleQueryEvent(Event event) {
QueryEventData command = unwrapData(event);
logger.debug("Received update table command: {}", event);
logger.debug("Received query command: {}", event);
String sql = command.getSql().trim();
if (sql.equalsIgnoreCase("BEGIN")) {
// ignore these altogether ...
// We are starting a new transaction ...
source.startNextTransaction();
if (initialEventsToSkip != 0) {
logger.debug("Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
initialEventsToSkip, startingRowNumber);
// We are restarting, so we need to skip the events in this transaction that we processed previously...
skipEvent = true;
}
return;
}
if (sql.equalsIgnoreCase("COMMIT")) {
// ignore these altogether ...
// We are completing the transaction ...
source.commitTransaction();
skipEvent = false;
return;
}
context.dbSchema().applyDdl(context.source(), command.getDatabase(), command.getSql(), (dbName, statements) -> {
@ -438,6 +461,11 @@ protected void handleUpdateTableMetadata(Event event) {
* @throws InterruptedException if this thread is interrupted while blocking
*/
protected void handleInsert(Event event) throws InterruptedException {
if (skipEvent) {
// We can skip this because we should already be at least this far ...
logger.debug("Skipping previously processed row event: {}", event);
return;
}
WriteRowsEventData write = unwrapData(event);
long tableNumber = write.getTableId();
BitSet includedColumns = write.getIncludedColumns();
@ -447,13 +475,26 @@ protected void handleInsert(Event event) throws InterruptedException {
Long ts = context.clock().currentTimeInMillis();
int count = 0;
int numRows = rows.size();
for (int row = startingRowNumber; row != numRows; ++row) {
count += recordMaker.create(rows.get(row), ts, row, numRows);
if (startingRowNumber < numRows) {
for (int row = startingRowNumber; row != numRows; ++row) {
count += recordMaker.create(rows.get(row), ts, row, numRows);
}
if (logger.isDebugEnabled()) {
if (startingRowNumber != 0) {
logger.debug("Recorded {} insert record(s) for last {} row(s) in event: {}",
count, numRows - startingRowNumber, event);
} else {
logger.debug("Recorded {} insert record(s) for event: {}", count, event);
}
}
} else {
// All rows were previously processed ...
logger.debug("Skipping previously processed insert event: {}", event);
}
logger.debug("Recorded {} insert records for event: {}", count, event);
} else {
logger.debug("Skipping insert row event: {}", event);
}
startingRowNumber = 0;
}
/**
@ -463,6 +504,11 @@ protected void handleInsert(Event event) throws InterruptedException {
* @throws InterruptedException if this thread is interrupted while blocking
*/
protected void handleUpdate(Event event) throws InterruptedException {
if (skipEvent) {
// We can skip this because we should already be at least this far ...
logger.debug("Skipping previously processed row event: {}", event);
return;
}
UpdateRowsEventData update = unwrapData(event);
long tableNumber = update.getTableId();
BitSet includedColumns = update.getIncludedColumns();
@ -473,16 +519,29 @@ protected void handleUpdate(Event event) throws InterruptedException {
Long ts = context.clock().currentTimeInMillis();
int count = 0;
int numRows = rows.size();
for (int row = startingRowNumber; row != numRows; ++row) {
Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
Serializable[] before = changes.getKey();
Serializable[] after = changes.getValue();
count += recordMaker.update(before, after, ts, row, numRows);
if (startingRowNumber < numRows) {
for (int row = startingRowNumber; row != numRows; ++row) {
Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
Serializable[] before = changes.getKey();
Serializable[] after = changes.getValue();
count += recordMaker.update(before, after, ts, row, numRows);
}
if (logger.isDebugEnabled()) {
if (startingRowNumber != 0) {
logger.debug("Recorded {} update record(s) for last {} row(s) in event: {}",
count, numRows - startingRowNumber, event);
} else {
logger.debug("Recorded {} update record(s) for event: {}", count, event);
}
}
} else {
// All rows were previously processed ...
logger.debug("Skipping previously processed update event: {}", event);
}
logger.debug("Recorded {} update records for event: {}", count, event);
} else {
logger.debug("Skipping update row event: {}", event);
}
startingRowNumber = 0;
}
/**
@ -492,6 +551,11 @@ protected void handleUpdate(Event event) throws InterruptedException {
* @throws InterruptedException if this thread is interrupted while blocking
*/
protected void handleDelete(Event event) throws InterruptedException {
if (skipEvent) {
// We can skip this because we should already be at least this far ...
logger.debug("Skipping previously processed row event: {}", event);
return;
}
DeleteRowsEventData deleted = unwrapData(event);
long tableNumber = deleted.getTableId();
BitSet includedColumns = deleted.getIncludedColumns();
@ -501,13 +565,26 @@ protected void handleDelete(Event event) throws InterruptedException {
Long ts = context.clock().currentTimeInMillis();
int count = 0;
int numRows = rows.size();
for (int row = startingRowNumber; row != numRows; ++row) {
count += recordMaker.delete(rows.get(row), ts, row, numRows);
if (startingRowNumber < numRows) {
for (int row = startingRowNumber; row != numRows; ++row) {
count += recordMaker.delete(rows.get(row), ts, row, numRows);
}
if (logger.isDebugEnabled()) {
if (startingRowNumber != 0) {
logger.debug("Recorded {} delete record(s) for last {} row(s) in event: {}",
count, numRows - startingRowNumber, event);
} else {
logger.debug("Recorded {} delete record(s) for event: {}", count, event);
}
}
} else {
// All rows were previously processed ...
logger.debug("Skipping previously processed delete event: {}", event);
}
logger.debug("Recorded {} delete records for event: {}", count, event);
} else {
logger.debug("Skipping delete row event: {}", event);
}
startingRowNumber = 0;
}
protected SSLMode sslModeFor(SecureConnectionMode mode) {

View File

@ -139,7 +139,7 @@ public synchronized void start(Map<String, String> props) {
if (!startWithSnapshot && source.gtidSet() == null && isGtidModeEnabled()) {
// The snapshot will properly determine the GTID set, but we're not starting with a snapshot and GTIDs were not
// previously used but the MySQL server has them enabled ...
source.setGtidSet("");
source.setCompletedGtidSet("");
}
// Check whether the row-level binlog is enabled ...

View File

@ -265,11 +265,11 @@ public ObjectName metricName(String contextName) throws MalformedObjectNameExcep
* none were filtered
*/
public GtidSet filterGtidSet(GtidSet availableServerGtidSet) {
logger.info("Attempting to generate a filtered GTID set");
String gtidStr = source.gtidSet();
if (gtidStr == null) {
return null;
}
logger.info("Attempting to generate a filtered GTID set");
logger.info("GTID set from previous recorded offset: {}", gtidStr);
GtidSet filteredGtidSet = new GtidSet(gtidStr);
Predicate<String> gtidSourceFilter = gtidSourceFilter();

View File

@ -222,7 +222,7 @@ protected void execute() {
if (rs.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...
String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
source.setGtidSet(gtidSet);
source.setCompletedGtidSet(gtidSet);
logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
gtidSet);
} else {

View File

@ -32,10 +32,13 @@
* </pre>
*
* <p>
* The {@link #offset() source offset} information describes how much of the database's binary log the source the change detector
* has already processed, and it includes the {@link #binlogFilename() binlog filename}, the {@link #nextBinlogPosition() next
* position} in the binlog to start reading, and the {@link #nextEventRowNumber() next event row number}. Here's a JSON-like
* representation of an example:
* The {@link #offset() source offset} information is included in each event and captures where the connector should restart
* if this event's offset is the last one recorded. The offset includes the {@link #binlogFilename() binlog filename},
* the {@link #binlogPosition() position of the first event} in the binlog, the
* {@link #eventsToSkipUponRestart() number of events to skip}, and the
* {@link #rowsToSkipUponRestart() number of rows to also skip}.
* <p>
* Here's a JSON-like representation of an example:
*
* <pre>
* {
@ -44,22 +47,26 @@
* "gtid": "db58b0ae-2c10-11e6-b284-0242ac110002:199",
* "file": "mysql-bin.000003",
* "pos" = 990,
* "event" = 0,
* "row": 0,
* "snapshot": true
* }
* </pre>
*
* <p>
* The "{@code gtids}" field only appears in offsets produced when GTIDs are enabled. The "{@code snapshot}" field only appears in
* offsets produced when the connector is in the middle of a snapshot. And finally, the "{@code ts}" field contains the
* <em>seconds</em> since Unix epoch (since Jan 1, 1970) of the MySQL event; the message {@link Envelope envelopes} also have a
* timestamp, but that timestamp is the <em>milliseconds</em> since since Jan 1, 1970.
*
* The {@link #struct() source} struct appears in each message envelope and contains MySQL information about the event. It is
* a mixture the field from the {@link #partition() partition} (which is renamed in the source to make more sense), the
* {@link #lastBinlogPosition() position} of the event (and {@link #lastEventRowNumber() row number} within the event) inside
* the {@link #binlogFilename() binlog file}. When GTIDs are enabled, it also includes the GTID of the transaction in which the
* event occurs. Like with the offset, the "{@code snapshot}" field only appears for events produced when the connector is in the
* middle of a snapshot. Here's a JSON-like representation of the source for an event that corresponds to the above partition and
* <p>
* Each change event envelope also includes the {@link #struct() source} struct that contains MySQL information about that
* particular event, including a mixture the fields from the {@link #partition() partition} (which is renamed in the source to
* make more sense), the binlog filename and position where the event can be found, and when GTIDs are enabled the GTID of the
* transaction in which the event occurs. Like with the offset, the "{@code snapshot}" field only appears for events produced
* when the connector is in the middle of a snapshot. Note that this information is likely different than the offset information,
* since the connector may need to restart from either just after the most recently completed transaction or the beginning
* of the most recently started transaction (whichever appears later in the binlog).
* <p>
* Here's a JSON-like representation of the source for an event that corresponds to the above partition and
* offset:
*
* <pre>
@ -88,9 +95,10 @@ final class SourceInfo {
public static final String SERVER_PARTITION_KEY = "server";
public static final String GTID_SET_KEY = "gtids";
public static final String GTID_KEY = "gtid";
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
public static final String BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY = "row";
public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
public static final String TIMESTAMP_KEY = "ts_sec";
public static final String SNAPSHOT_KEY = "snapshot";
@ -105,17 +113,22 @@ final class SourceInfo {
.field(GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
.field(BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA)
.field(BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, Schema.INT32_SCHEMA)
.field(BINLOG_ROW_IN_EVENT_OFFSET_KEY, Schema.INT32_SCHEMA)
.field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();
private String gtidSet;
private String binlogGtid;
private String binlogFilename;
private long lastBinlogPosition = 0;
private int lastEventRowNumber = 0;
private long nextBinlogPosition = 4;
private int nextEventRowNumber = 0;
private String currentGtidSet;
private String currentGtid;
private String currentBinlogFilename;
private long currentBinlogPosition = 0L;
private int currentRowNumber = 0;
private long currentEventLengthInBytes = 0;
private String restartGtidSet;
private String restartBinlogFilename;
private long restartBinlogPosition = 0L;
private long restartEventsToSkip = 0;
private int restartRowsToSkip = 0;
private boolean inTransaction = false;
private String serverName;
private long serverId = 0;
private long binlogTimestampSeconds = 0;
@ -149,6 +162,40 @@ public Map<String, String> partition() {
return sourcePartition;
}
/**
* Set the position in the MySQL binlog where we will start reading.
*
* @param binlogFilename the name of the binary log file; may not be null
* @param positionOfFirstEvent the position in the binary log file to begin processing
*/
public void setBinlogStartPoint(String binlogFilename, long positionOfFirstEvent) {
if (binlogFilename != null) {
this.currentBinlogFilename = binlogFilename;
this.restartBinlogFilename = binlogFilename;
}
assert positionOfFirstEvent >= 0;
this.currentBinlogPosition = positionOfFirstEvent;
this.restartBinlogPosition = positionOfFirstEvent;
this.currentRowNumber = 0;
this.restartRowsToSkip = 0;
}
/**
* Set the position within the MySQL binary log file of the <em>current event</em>.
*
* @param positionOfCurrentEvent the position within the binary log file of the current event
* @param eventSizeInBytes the size in bytes of this event
*/
public void setEventPosition(long positionOfCurrentEvent, long eventSizeInBytes) {
this.currentBinlogPosition = positionOfCurrentEvent;
this.currentEventLengthInBytes = eventSizeInBytes;
if (!inTransaction) {
this.restartBinlogPosition = positionOfCurrentEvent + eventSizeInBytes;
}
// Don't set anything else, since the row numbers are set in the offset(int,int) method called at least once
// for each processed event
}
/**
* Get the Kafka Connect detail about the source "offset", which describes the position within the source where we last
* have last read.
@ -156,43 +203,50 @@ public Map<String, String> partition() {
* @return a copy of the current offset; never null
*/
public Map<String, ?> offset() {
return offsetUsingPosition(nextBinlogPosition);
return offsetUsingPosition(this.restartRowsToSkip);
}
/**
* Set the current row number within a given event, and then get the Kafka Connect detail about the source "offset", which
* describes the position within the source where we have last read.
* Given the row number within a binlog event and the total number of rows in that event, compute and return the
* Kafka Connect offset that is be included in the produced change event describing the row.
* <p>
* This method should always be called before {@link #struct()}.
*
* @param eventRowNumber the 0-based row number within the event being processed
* @param eventRowNumber the 0-based row number within the event for which the offset is to be produced
* @param totalNumberOfRows the total number of rows within the event being processed
* @return a copy of the current offset; never null
* @see #struct()
*/
public Map<String, ?> offsetForRow(int eventRowNumber, int totalNumberOfRows) {
if (eventRowNumber < (totalNumberOfRows - 1)) {
// This is not the last row, so our offset should record the next row to be used ...
this.lastEventRowNumber = eventRowNumber;
this.nextEventRowNumber = eventRowNumber + 1;
this.currentRowNumber = eventRowNumber;
this.restartRowsToSkip = this.currentRowNumber + 1;
// so write out the offset with the position of this event
return offsetUsingPosition(lastBinlogPosition);
return offsetUsingPosition(this.restartRowsToSkip);
}
// This is the last row, so write out the offset that has the position of the next event ...
this.lastEventRowNumber = this.nextEventRowNumber;
this.nextEventRowNumber = 0;
return offsetUsingPosition(nextBinlogPosition);
this.currentRowNumber = eventRowNumber;
this.restartRowsToSkip = 0;
return offsetUsingPosition(totalNumberOfRows);
}
private Map<String, ?> offsetUsingPosition(long binlogPosition) {
private Map<String, ?> offsetUsingPosition(long rowsToSkip) {
Map<String, Object> map = new HashMap<>();
if (serverId != 0) map.put(SERVER_ID_KEY, serverId);
if (binlogTimestampSeconds != 0) map.put(TIMESTAMP_KEY, binlogTimestampSeconds);
if (gtidSet != null) {
map.put(GTID_SET_KEY, gtidSet);
if (restartGtidSet != null) {
// Put the previously-completed GTID set in the offset along with the event number ...
map.put(GTID_SET_KEY, restartGtidSet);
}
map.put(BINLOG_FILENAME_OFFSET_KEY, binlogFilename);
map.put(BINLOG_POSITION_OFFSET_KEY, binlogPosition);
map.put(BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, nextEventRowNumber);
map.put(BINLOG_FILENAME_OFFSET_KEY, restartBinlogFilename);
map.put(BINLOG_POSITION_OFFSET_KEY, restartBinlogPosition);
if (restartEventsToSkip != 0) {
map.put(EVENTS_TO_SKIP_OFFSET_KEY, restartEventsToSkip);
}
if (rowsToSkip != 0) {
map.put(BINLOG_ROW_IN_EVENT_OFFSET_KEY, rowsToSkip);
}
if (binlogTimestampSeconds != 0) map.put(TIMESTAMP_KEY, binlogTimestampSeconds);
if (isSnapshotInEffect()) {
map.put(SNAPSHOT_KEY, true);
}
@ -223,13 +277,13 @@ public Struct struct() {
Struct result = new Struct(SCHEMA);
result.put(SERVER_NAME_KEY, serverName);
result.put(SERVER_ID_KEY, serverId);
// Don't put the GTID Set into the struct; only the current GTID is fine ...
if (binlogGtid != null) {
result.put(GTID_KEY, binlogGtid);
if (currentGtid != null) {
// Don't put the GTID Set into the struct; only the current GTID is fine ...
result.put(GTID_KEY, currentGtid);
}
result.put(BINLOG_FILENAME_OFFSET_KEY, binlogFilename);
result.put(BINLOG_POSITION_OFFSET_KEY, lastBinlogPosition);
result.put(BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, lastEventRowNumber);
result.put(BINLOG_FILENAME_OFFSET_KEY, currentBinlogFilename);
result.put(BINLOG_POSITION_OFFSET_KEY, currentBinlogPosition);
result.put(BINLOG_ROW_IN_EVENT_OFFSET_KEY, currentRowNumber);
result.put(TIMESTAMP_KEY, binlogTimestampSeconds);
if (lastSnapshot) {
result.put(SNAPSHOT_KEY, true);
@ -246,54 +300,73 @@ public boolean isSnapshotInEffect() {
return nextSnapshot;
}
/**
* Set the latest GTID from the MySQL binary log file.
*
* @param gtid the string representation of a specific GTID; may not be null
*/
public void setGtid(String gtid) {
this.binlogGtid = gtid;
public void startNextTransaction() {
// If we have to restart, then we'll start with this BEGIN transaction
this.restartRowsToSkip = 0;
this.restartEventsToSkip = 0;
this.restartBinlogFilename = this.currentBinlogFilename;
this.restartBinlogPosition = this.currentBinlogPosition;
this.inTransaction = true;
}
/**
* Set the set of GTIDs known to the MySQL server.
*
* @param gtidSet the string representation of GTID set; may not be null
* Capture that we're starting a new event.
*/
public void setGtidSet(String gtidSet) {
public void completeEvent() {
++restartEventsToSkip;
}
/**
* Get the number of events after the last transaction BEGIN that we've already processed.
*
* @return the number of events in the transaction that have been processed completely
* @see #completeEvent()
* @see #startNextTransaction()
*/
public long eventsToSkipUponRestart() {
return restartEventsToSkip;
}
public void commitTransaction() {
this.restartGtidSet = this.currentGtidSet;
this.restartBinlogFilename = this.currentBinlogFilename;
this.restartBinlogPosition = this.currentBinlogPosition + this.currentEventLengthInBytes;
this.restartRowsToSkip = 0;
this.restartEventsToSkip = 0;
this.inTransaction = false;
}
/**
* Record that a new GTID transaction has been started and has been included in the set of GTIDs known to the MySQL server.
*
* @param gtid the string representation of a specific GTID that has been begun; may not be null
* @param gtidSet the string representation of GTID set that includes the newly begun GTID; may not be null
*/
public void startGtid(String gtid, String gtidSet) {
this.currentGtid = gtid;
if (gtidSet != null && !gtidSet.trim().isEmpty()) {
this.gtidSet = gtidSet.replaceAll("\n", "").replaceAll("\r", ""); // remove all of the newline chars if they exist
// Remove all the newline chars that exist in the GTID set string ...
String trimmedGtidSet = gtidSet.replaceAll("\n", "").replaceAll("\r", "");
// Set the GTID set that we'll use if restarting BEFORE successful completion of the events in this GTID ...
this.restartGtidSet = this.currentGtidSet != null ? this.currentGtidSet : trimmedGtidSet;
// Record the GTID set that includes the current transaction ...
this.currentGtidSet = trimmedGtidSet;
}
}
/**
* Set the name of the MySQL binary log file.
* Set the GTID set that captures all of the GTID transactions that have been completely processed.
*
* @param binlogFilename the name of the binary log file; may not be null
* @param positionOfFirstEvent the position in the binary log file to begin processing
* @param gtidSet the string representation of the GTID set; may not be null, but may be an empty string if no GTIDs
* have been previously processed
*/
public void setBinlogStartPoint(String binlogFilename, long positionOfFirstEvent) {
if (binlogFilename != null) {
this.binlogFilename = binlogFilename;
public void setCompletedGtidSet(String gtidSet) {
if (gtidSet != null && !gtidSet.trim().isEmpty()) {
// Remove all the newline chars that exist in the GTID set string ...
String trimmedGtidSet = gtidSet.replaceAll("\n", "").replaceAll("\r", "");
this.currentGtidSet = trimmedGtidSet;
this.restartGtidSet = trimmedGtidSet;
}
assert positionOfFirstEvent >= 0;
this.nextBinlogPosition = positionOfFirstEvent;
this.lastBinlogPosition = this.nextBinlogPosition;
this.nextEventRowNumber = 0;
this.lastEventRowNumber = 0;
}
/**
* Set the position within the MySQL binary log file of the <em>current event</em>.
*
* @param positionOfCurrentEvent the position within the binary log file of the current event
* @param eventSizeInBytes the size in bytes of this event
*/
public void setEventPosition(long positionOfCurrentEvent, long eventSizeInBytes) {
this.lastBinlogPosition = positionOfCurrentEvent;
this.nextBinlogPosition = positionOfCurrentEvent + eventSizeInBytes;
// Don't set anything else, since the row numbers are set in the offset(int,int) method called at least once
// for each processed event
}
/**
@ -350,15 +423,15 @@ public void completeSnapshot() {
public void setOffset(Map<String, ?> sourceOffset) {
if (sourceOffset != null) {
// We have previously recorded an offset ...
setGtidSet((String) sourceOffset.get(GTID_SET_KEY)); // may be null
binlogFilename = (String) sourceOffset.get(BINLOG_FILENAME_OFFSET_KEY);
setCompletedGtidSet((String) sourceOffset.get(GTID_SET_KEY)); // may be null
restartEventsToSkip = longOffsetValue(sourceOffset, EVENTS_TO_SKIP_OFFSET_KEY);
String binlogFilename = (String) sourceOffset.get(BINLOG_FILENAME_OFFSET_KEY);
if (binlogFilename == null) {
throw new ConnectException("Source offset '" + BINLOG_FILENAME_OFFSET_KEY + "' parameter is missing");
}
nextBinlogPosition = longOffsetValue(sourceOffset, BINLOG_POSITION_OFFSET_KEY);
nextEventRowNumber = (int) longOffsetValue(sourceOffset, BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY);
lastBinlogPosition = nextBinlogPosition;
lastEventRowNumber = nextEventRowNumber;
long binlogPosition = longOffsetValue(sourceOffset, BINLOG_POSITION_OFFSET_KEY);
setBinlogStartPoint(binlogFilename, binlogPosition);
this.restartRowsToSkip = (int) longOffsetValue(sourceOffset, BINLOG_ROW_IN_EVENT_OFFSET_KEY);
nextSnapshot = booleanOffsetValue(sourceOffset, SNAPSHOT_KEY);
lastSnapshot = nextSnapshot;
}
@ -366,7 +439,7 @@ public void setOffset(Map<String, ?> sourceOffset) {
private long longOffsetValue(Map<String, ?> values, String key) {
Object obj = values.get(key);
if (obj == null) return 0;
if (obj == null) return 0L;
if (obj instanceof Number) return ((Number) obj).longValue();
try {
return Long.parseLong(obj.toString());
@ -388,16 +461,16 @@ private boolean booleanOffsetValue(Map<String, ?> values, String key) {
* @return the string representation of the binlog GTID ranges; may be null
*/
public String gtidSet() {
return this.gtidSet != null ? this.gtidSet : null;
return this.currentGtidSet != null ? this.currentGtidSet : null;
}
/**
* Get the name of the MySQL binary log file that has been processed.
* Get the name of the MySQL binary log file that has last been processed.
*
* @return the name of the binary log file; null if it has not been {@link #setBinlogStartPoint(String, long) set}
*/
public String binlogFilename() {
return binlogFilename;
return restartBinlogFilename;
}
/**
@ -405,8 +478,8 @@ public String binlogFilename() {
*
* @return the position within the binary log file; null if it has not been {@link #setBinlogStartPoint(String, long) set}
*/
public long nextBinlogPosition() {
return nextBinlogPosition;
public long binlogPosition() {
return restartBinlogPosition;
}
/**
@ -414,30 +487,18 @@ public long nextBinlogPosition() {
*
* @return the position within the binary log file; null if it has not been {@link #setBinlogStartPoint(String, long) set}
*/
public long lastBinlogPosition() {
return lastBinlogPosition;
protected long restartBinlogPosition() {
return restartBinlogPosition;
}
/**
* Get the next row within the event at the {@link #nextBinlogPosition() position} within the {@link #binlogFilename() binary
* log file}
* .
* Get the number of rows beyond the {@link #eventsToSkipUponRestart() last completely processed event} to be skipped
* upon restart.
*
* @return the 0-based row number
* @return the number of rows to be skipped
*/
public int nextEventRowNumber() {
return nextEventRowNumber;
}
/**
* Get the previous row within the event at the {@link #lastBinlogPosition() position} within the {@link #binlogFilename()
* binary log file}
* .
*
* @return the 0-based row number
*/
public int lastEventRowNumber() {
return lastEventRowNumber;
public int rowsToSkipUponRestart() {
return restartRowsToSkip;
}
/**
@ -452,22 +513,26 @@ public String serverName() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (gtidSet != null) {
if (currentGtidSet != null) {
sb.append("GTIDs ");
sb.append(gtidSet);
sb.append(" and binlog file '").append(binlogFilename).append("'");
sb.append(", pos=").append(nextBinlogPosition());
sb.append(", row=").append(nextEventRowNumber());
sb.append(currentGtidSet);
sb.append(" and binlog file '").append(restartBinlogFilename).append("'");
sb.append(", pos=").append(restartBinlogPosition);
sb.append(", skipping ").append(restartEventsToSkip);
sb.append(" events plus ").append(restartRowsToSkip);
sb.append(" rows");
} else {
if (binlogFilename == null) {
if (restartBinlogFilename == null) {
sb.append("<latest>");
} else {
if ("".equals(binlogFilename)) {
if ("".equals(restartBinlogFilename)) {
sb.append("earliest binlog file and position");
} else {
sb.append("binlog file '").append(binlogFilename).append("'");
sb.append(", pos=").append(nextBinlogPosition());
sb.append(", row=").append(nextEventRowNumber());
sb.append("binlog file '").append(restartBinlogFilename).append("'");
sb.append(", pos=").append(restartBinlogPosition);
sb.append(", skipping ").append(restartEventsToSkip);
sb.append(" events plus ").append(restartRowsToSkip);
sb.append(" rows");
}
}
}
@ -505,7 +570,14 @@ public static boolean isPositionAtOrBefore(Document recorded, Document desired)
// the desired is in snapshot mode, but the recorded is not. So the recorded is *after* the desired ...
return false;
}
// In all other cases (even when recorded is in snapshot mode), recorded is before or at desired ...
// In all other cases (even when recorded is in snapshot mode), recorded is before or at desired GTID.
// Now we need to compare how many events in that transaction we've already completed ...
int recordedEventCount = recorded.getInteger(EVENTS_TO_SKIP_OFFSET_KEY, 0);
int desiredEventCount = desired.getInteger(EVENTS_TO_SKIP_OFFSET_KEY, 0);
int diff = recordedEventCount - desiredEventCount;
if (diff > 0) return false;
// Otherwise the recorded is definitely before or at the desired ...
return true;
}
// The GTIDs are not an exact match, so figure out if recorded is a subset of the desired ...
@ -543,16 +615,25 @@ public static boolean isPositionAtOrBefore(Document recorded, Document desired)
assert recordedFilename != null;
int diff = recordedFilename.compareToIgnoreCase(desiredFilename);
if (diff > 0) return false;
if (diff < 0) return true;
// The filenames are the same, so compare the positions ...
int recordedPosition = recorded.getInteger(BINLOG_POSITION_OFFSET_KEY, -1);
int desiredPosition = desired.getInteger(BINLOG_POSITION_OFFSET_KEY, -1);
diff = recordedPosition - desiredPosition;
if (diff > 0) return false;
if (diff < 0) return true;
// The positions are the same, so compare the row number ...
int recordedRow = recorded.getInteger(BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, -1);
int desiredRow = desired.getInteger(BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, -1);
// The positions are the same, so compare the completed events in the transaction ...
int recordedEventCount = recorded.getInteger(EVENTS_TO_SKIP_OFFSET_KEY, 0);
int desiredEventCount = desired.getInteger(EVENTS_TO_SKIP_OFFSET_KEY, 0);
diff = recordedEventCount - desiredEventCount;
if (diff > 0) return false;
if (diff < 0) return true;
// The completed events are the same, so compare the row number ...
int recordedRow = recorded.getInteger(BINLOG_ROW_IN_EVENT_OFFSET_KEY, -1);
int desiredRow = desired.getInteger(BINLOG_ROW_IN_EVENT_OFFSET_KEY, -1);
diff = recordedRow - desiredRow;
if (diff > 0) return false;

View File

@ -8,8 +8,10 @@
import static org.junit.Assert.fail;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.Config;
@ -28,6 +30,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine.CompletionResult;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
@ -274,11 +277,12 @@ public void shouldValidateAcceptableConfiguration() {
public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
String masterPort = System.getProperty("database.port");
String replicaPort = System.getProperty("database.replica.port");
if ( !masterPort.equals(replicaPort)) {
boolean replicaIsMaster = masterPort.equals(replicaPort);
if (!replicaIsMaster) {
// Give time for the replica to catch up to the master ...
Thread.sleep(5000L);
}
// Use the DB configuration to define the connector's configuration to use the "replica"
// which may be the same as the "master" ...
config = Configuration.create()
@ -352,7 +356,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
}
}
//Testing.Print.enable();
// Testing.Print.enable();
// Restart the connector and read the insert record ...
Testing.print("*** Restarting connector after inserts were made");
@ -388,7 +392,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
inserts = records.recordsForTopic("myServer.connector_test.products");
assertInsert(inserts.get(0), "id", 1001);
Testing.print("*** Done with simple insert");
// Testing.print("*** Done with simple insert");
// ---------------------------------------------------------------------------------------------------------------
// Changing the primary key of a row should result in 3 events: INSERT, DELETE, and TOMBSTONE
@ -433,13 +437,15 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
Testing.print("*** Done with simple update");
//Testing.Print.enable();
// ---------------------------------------------------------------------------------------------------------------
// Change our schema with a fully-qualified name; we should still see this event
// ---------------------------------------------------------------------------------------------------------------
// Add a column with default to the 'products' table and explicitly update one record ...
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("ALTER TABLE connector_test.products ADD COLUMN volume FLOAT NOT NULL, ADD COLUMN alias VARCHAR(30) NOT NULL AFTER description");
connection.execute("ALTER TABLE connector_test.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL AFTER description");
connection.execute("UPDATE products SET volume=13.5 WHERE id=2001");
connection.query("SELECT * FROM products", rs -> {
if (Testing.Print.isEnabled()) connection.print(rs);
@ -508,6 +514,173 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// Stop the connector ...
// ---------------------------------------------------------------------------------------------------------------
stopConnector();
// ---------------------------------------------------------------------------------------------------------------
// Restart the connector to read only part of a transaction ...
// ---------------------------------------------------------------------------------------------------------------
Testing.print("*** Restarting connector");
CompletionResult completion = new CompletionResult();
start(MySqlConnector.class, config, completion, (record) -> {
// We want to stop before processing record 3003 ...
Struct key = (Struct) record.key();
Number id = (Number) key.get("id");
if (id.intValue() == 3003) {
return true;
}
return false;
});
BinlogPosition positionBeforeInserts = new BinlogPosition();
BinlogPosition positionAfterInserts = new BinlogPosition();
BinlogPosition positionAfterUpdate = new BinlogPosition();
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.query("SHOW MASTER STATUS", positionBeforeInserts::readFromDatabase);
connection.execute("INSERT INTO products(id,name,description,weight,volume,alias) VALUES "
+ "(3001,'ashley','super robot',34.56,0.00,'ashbot'), "
+ "(3002,'arthur','motorcycle',87.65,0.00,'arcycle'), "
+ "(3003,'oak','tree',987.65,0.00,'oak');");
connection.query("SELECT * FROM products", rs -> {
if (Testing.Print.isEnabled()) connection.print(rs);
});
connection.query("SHOW MASTER STATUS", positionAfterInserts::readFromDatabase);
// Change something else that is unrelated ...
connection.execute("UPDATE products_on_hand SET quantity=40 WHERE product_id=109");
connection.query("SELECT * FROM products_on_hand", rs -> {
if (Testing.Print.isEnabled()) connection.print(rs);
});
connection.query("SHOW MASTER STATUS", positionAfterUpdate::readFromDatabase);
}
}
//Testing.Print.enable();
// And consume the one insert ...
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(2);
assertThat(records.topics().size()).isEqualTo(1);
inserts = records.recordsForTopic("myServer.connector_test.products");
assertInsert(inserts.get(0), "id", 3001);
assertInsert(inserts.get(1), "id", 3002);
// Verify that the connector has stopped ...
completion.await(10, TimeUnit.SECONDS);
assertThat(completion.hasCompleted()).isTrue();
assertThat(completion.hasError()).isTrue();
assertThat(completion.success()).isFalse();
assertNoRecordsToConsume();
assertConnectorNotRunning();
// ---------------------------------------------------------------------------------------------------------------
// Stop the connector ...
// ---------------------------------------------------------------------------------------------------------------
stopConnector();
// Read the last committed offsets, and verify the binlog coordinates ...
SourceInfo persistedOffsetSource = new SourceInfo();
persistedOffsetSource.setServerName(config.getString(MySqlConnectorConfig.SERVER_NAME));
Map<String, ?> lastCommittedOffset = readLastCommittedOffset(config, persistedOffsetSource.partition());
persistedOffsetSource.setOffset(lastCommittedOffset);
Testing.print("Position before inserts: " + positionBeforeInserts);
Testing.print("Position after inserts: " + positionAfterInserts);
Testing.print("Offset: " + lastCommittedOffset);
Testing.print("Position after update: " + positionAfterUpdate);
if (replicaIsMaster) {
// Same binlog filename ...
assertThat(persistedOffsetSource.binlogFilename()).isEqualTo(positionBeforeInserts.binlogFilename());
assertThat(persistedOffsetSource.binlogFilename()).isEqualTo(positionAfterInserts.binlogFilename());
// Binlog position in offset should be more than before the inserts, but less than the position after the inserts ...
assertThat(persistedOffsetSource.binlogPosition()).isGreaterThan(positionBeforeInserts.binlogPosition());
assertThat(persistedOffsetSource.binlogPosition()).isLessThan(positionAfterInserts.binlogPosition());
} else {
// the replica is not the same server as the master, so it will have a different binlog filename and position ...
}
// Event number is 2 ...
assertThat(persistedOffsetSource.eventsToSkipUponRestart()).isEqualTo(2);
// GTID set should match the before-inserts GTID set ...
// assertThat(persistedOffsetSource.gtidSet()).isEqualTo(positionBeforeInserts.gtidSet());
Testing.print("*** Restarting connector, and should begin with inserting 3003 (not 109!)");
start(MySqlConnector.class, config);
// And consume the insert for 3003 ...
records = consumeRecordsByTopic(1);
assertThat(records.topics().size()).isEqualTo(1);
inserts = records.recordsForTopic("myServer.connector_test.products");
if (inserts == null) {
updates = records.recordsForTopic("myServer.connector_test.products_on_hand");
if (updates != null) {
fail("Restarted connector and missed the insert of product id=3003!");
}
}
// Read the first record produced since we've restarted
SourceRecord prod3003 = inserts.get(0);
assertInsert(prod3003, "id", 3003);
// Check that the offset has the correct/expected values ...
assertOffset(prod3003,"file",lastCommittedOffset.get("file"));
assertOffset(prod3003,"pos",lastCommittedOffset.get("pos"));
assertOffset(prod3003,"row",3);
assertOffset(prod3003,"event",lastCommittedOffset.get("event"));
// Check that the record has all of the column values ...
assertValueField(prod3003,"after/id",3003);
assertValueField(prod3003,"after/name","oak");
assertValueField(prod3003,"after/description","tree");
assertValueField(prod3003,"after/weight",987.65d);
assertValueField(prod3003,"after/volume",0.0d);
assertValueField(prod3003,"after/alias","oak");
// And make sure we consume that one extra update ...
records = consumeRecordsByTopic(1);
assertThat(records.topics().size()).isEqualTo(1);
updates = records.recordsForTopic("myServer.connector_test.products_on_hand");
assertThat(updates.size()).isEqualTo(1);
assertUpdate(updates.get(0), "product_id", 109);
updates.forEach(this::validate);
// Start the connector again, and we should see the next two
Testing.print("*** Done with simple insert");
}
protected static class BinlogPosition {
private String binlogFilename;
private long binlogPosition;
private String gtidSet;
public void readFromDatabase(ResultSet rs) throws SQLException {
if (rs.next()) {
binlogFilename = rs.getString(1);
binlogPosition = rs.getLong(2);
if (rs.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...
gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
}
}
}
public String binlogFilename() {
return binlogFilename;
}
public long binlogPosition() {
return binlogPosition;
}
public String gtidSet() {
return gtidSet;
}
public boolean hasGtids() {
return gtidSet != null;
}
@Override
public String toString() {
return "file=" + binlogFilename + ", pos=" + binlogPosition + ", gtids=" + (gtidSet != null ? gtidSet : "");
}
}
@Test

View File

@ -215,7 +215,7 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
.build();
context = new MySqlTaskContext(config);
context.start();
context.source().setGtidSet(gtidStr);
context.source().setCompletedGtidSet(gtidStr);
GtidSet mergedGtidSet = context.filterGtidSet(new GtidSet(availableServerGtidStr));
assertThat(mergedGtidSet).isNotNull();

View File

@ -31,20 +31,25 @@ public class SourceInfoTest {
private static final String SERVER_NAME = "my-server"; // can technically be any string
private SourceInfo source;
private boolean inTxn = false;
private long positionOfBeginEvent = 0L;
private int eventNumberInTxn = 0;
@Before
public void beforeEach() {
source = new SourceInfo();
inTxn = false;
positionOfBeginEvent = 0L;
eventNumberInTxn = 0;
}
@Test
public void shouldStartSourceInfoFromZeroBinlogCoordinates() {
source.setBinlogStartPoint(FILENAME, 0);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.eventsToSkipUponRestart()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -52,10 +57,8 @@ public void shouldStartSourceInfoFromZeroBinlogCoordinates() {
public void shouldStartSourceInfoFromNonZeroBinlogCoordinates() {
source.setBinlogStartPoint(FILENAME, 100);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -68,10 +71,8 @@ public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinates() {
sourceWith(offset(0, 0));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -80,10 +81,8 @@ public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinates() {
sourceWith(offset(100, 0));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -92,10 +91,8 @@ public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZero
sourceWith(offset(0, 5));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -104,10 +101,8 @@ public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZ
sourceWith(offset(100, 5));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -116,10 +111,8 @@ public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndSnapsho
sourceWith(offset(0, 0, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -128,10 +121,8 @@ public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndSnap
sourceWith(offset(100, 0, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -140,10 +131,8 @@ public void shouldRecoverSourceInfoFromOffsetWithZeroBinlogCoordinatesAndNonZero
sourceWith(offset(0, 5, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -152,10 +141,8 @@ public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZ
sourceWith(offset(100, 5, true));
assertThat(source.gtidSet()).isNull();
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -164,10 +151,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoor
sourceWith(offset(GTID_SET, 0, 0, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -176,10 +161,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoor
sourceWith(offset(GTID_SET, 0, 5, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -188,10 +171,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogC
sourceWith(offset(GTID_SET, 100, 0, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -200,10 +181,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogC
sourceWith(offset(GTID_SET, 100, 5, false));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isFalse();
}
@ -212,10 +191,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoor
sourceWith(offset(GTID_SET, 0, 0, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -224,10 +201,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoor
sourceWith(offset(GTID_SET, 0, 5, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(0);
assertThat(source.lastBinlogPosition()).isEqualTo(0);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(0);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -236,10 +211,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogC
sourceWith(offset(GTID_SET, 100, 0, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(0);
assertThat(source.lastEventRowNumber()).isEqualTo(0);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -248,10 +221,8 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogC
sourceWith(offset(GTID_SET, 100, 5, true));
assertThat(source.gtidSet()).isEqualTo(GTID_SET);
assertThat(source.binlogFilename()).isEqualTo(FILENAME);
assertThat(source.nextBinlogPosition()).isEqualTo(100);
assertThat(source.lastBinlogPosition()).isEqualTo(100);
assertThat(source.nextEventRowNumber()).isEqualTo(5);
assertThat(source.lastEventRowNumber()).isEqualTo(5);
assertThat(source.binlogPosition()).isEqualTo(100);
assertThat(source.rowsToSkipUponRestart()).isEqualTo(5);
assertThat(source.isSnapshotInEffect()).isTrue();
}
@ -262,19 +233,89 @@ public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndNonZeroBinlogC
@Test
public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithOneRow() {
sourceWith(offset(100, 0));
// Try a transactions with just one event ...
handleTransactionBegin(150, 2);
handleNextEvent(200, 10, withRowCount(1));
handleTransactionCommit(210, 2);
handleTransactionBegin(210, 2);
handleNextEvent(220, 10, withRowCount(1));
handleTransactionCommit(230, 3);
handleTransactionBegin(240, 2);
handleNextEvent(250, 50, withRowCount(1));
handleTransactionCommit(300, 4);
// Try a transactions with multiple events ...
handleTransactionBegin(340, 2);
handleNextEvent(350, 20, withRowCount(1));
handleNextEvent(370, 30, withRowCount(1));
handleNextEvent(400, 40, withRowCount(1));
handleTransactionCommit(440, 4);
handleTransactionBegin(500, 2);
handleNextEvent(510, 20, withRowCount(1));
handleNextEvent(540, 15, withRowCount(1));
handleNextEvent(560, 10, withRowCount(1));
handleTransactionCommit(580, 4);
// Try another single event transaction ...
handleTransactionBegin(600, 2);
handleNextEvent(610, 50, withRowCount(1));
handleTransactionCommit(660, 4);
// Try event outside of a transaction ...
handleNextEvent(670, 10, withRowCount(1));
// Try another single event transaction ...
handleTransactionBegin(700, 2);
handleNextEvent(710, 50, withRowCount(1));
handleTransactionCommit(760, 4);
}
@Test
public void shouldAdvanceSourceInfoFromNonZeroPositionAndRowZeroForEventsWithMultipleRow() {
sourceWith(offset(100, 0));
// Try a transactions with just one event ...
handleTransactionBegin(150, 2);
handleNextEvent(200, 10, withRowCount(3));
handleTransactionCommit(210, 2);
handleTransactionBegin(210, 2);
handleNextEvent(220, 10, withRowCount(4));
handleNextEvent(250, 50, withRowCount(6));
handleNextEvent(300, 20, withRowCount(1));
handleNextEvent(350, 20, withRowCount(3));
handleTransactionCommit(230, 3);
handleTransactionBegin(240, 2);
handleNextEvent(250, 50, withRowCount(5));
handleTransactionCommit(300, 4);
// Try a transactions with multiple events ...
handleTransactionBegin(340, 2);
handleNextEvent(350, 20, withRowCount(6));
handleNextEvent(370, 30, withRowCount(1));
handleNextEvent(400, 40, withRowCount(3));
handleTransactionCommit(440, 4);
handleTransactionBegin(500, 2);
handleNextEvent(510, 20, withRowCount(8));
handleNextEvent(540, 15, withRowCount(9));
handleNextEvent(560, 10, withRowCount(1));
handleTransactionCommit(580, 4);
// Try another single event transaction ...
handleTransactionBegin(600, 2);
handleNextEvent(610, 50, withRowCount(1));
handleTransactionCommit(660, 4);
// Try event outside of a transaction ...
handleNextEvent(670, 10, withRowCount(5));
// Try another single event transaction ...
handleTransactionBegin(700, 2);
handleNextEvent(710, 50, withRowCount(3));
handleTransactionCommit(760, 4);
}
// -------------------------------------------------------------------------------------
@ -285,33 +326,78 @@ protected int withRowCount(int rowCount) {
return rowCount;
}
protected void handleNextEvent(long positionOfEvent, long eventSize, int rowCount) {
protected void handleTransactionBegin(long positionOfEvent, int eventSize) {
source.setEventPosition(positionOfEvent, eventSize);
for (int i = 0; i != rowCount; ++i) {
positionOfBeginEvent = positionOfEvent;
source.startNextTransaction();
inTxn = true;
assertThat(source.rowsToSkipUponRestart()).isEqualTo(0);
}
protected void handleTransactionCommit(long positionOfEvent, int eventSize) {
source.setEventPosition(positionOfEvent, eventSize);
source.commitTransaction();
eventNumberInTxn = 0;
inTxn = false;
// Verify the offset ...
Map<String, ?> offset = source.offset();
// The offset position should be the position of the next event
long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY);
assertThat(position).isEqualTo(positionOfEvent + eventSize);
Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY);
if (rowsToSkip == null) rowsToSkip = 0L;
assertThat(rowsToSkip).isEqualTo(0);
assertThat(offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY)).isNull();
if (source.gtidSet() != null) {
assertThat(offset.get(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet());
}
}
protected void handleNextEvent(long positionOfEvent, long eventSize, int rowCount) {
if (inTxn) ++eventNumberInTxn;
source.setEventPosition(positionOfEvent, eventSize);
for (int row = 0; row != rowCount; ++row) {
// Get the offset for this row (always first!) ...
Map<String, ?> offset = source.offsetForRow(i, rowCount);
if ((i + 1) < rowCount) {
// This is not the last row, so the next binlog position should be for next row in this event ...
assertThat(offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY)).isEqualTo(positionOfEvent);
assertThat(offset.get(SourceInfo.BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY)).isEqualTo(i+1);
} else {
// This is the last row, so the next binlog position should be for first row in next event ...
assertThat(offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY)).isEqualTo(positionOfEvent + eventSize);
assertThat(offset.get(SourceInfo.BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY)).isEqualTo(0);
}
Map<String, ?> offset = source.offsetForRow(row, rowCount);
assertThat(offset.get(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME);
if ( source.gtidSet() != null ) {
if (source.gtidSet() != null) {
assertThat(offset.get(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet());
}
long position = (Long) offset.get(SourceInfo.BINLOG_POSITION_OFFSET_KEY);
if (inTxn) {
// regardless of the row count, the position is always the txn begin position ...
assertThat(position).isEqualTo(positionOfBeginEvent);
// and the number of the last completed event (the previous one) ...
Long eventsToSkip = (Long) offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY);
if (eventsToSkip == null) eventsToSkip = 0L;
assertThat(eventsToSkip).isEqualTo(eventNumberInTxn - 1);
} else {
// Matches the next event ...
assertThat(position).isEqualTo(positionOfEvent + eventSize);
assertThat(offset.get(SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY)).isNull();
}
Long rowsToSkip = (Long) offset.get(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY);
if (rowsToSkip == null) rowsToSkip = 0L;
if( (row+1) == rowCount) {
// This is the last row, so the next binlog position should be the number of rows in the event ...
assertThat(rowsToSkip).isEqualTo(rowCount);
} else {
// This is not the last row, so the next binlog position should be the row number ...
assertThat(rowsToSkip).isEqualTo(row+1);
}
// Get the source struct for this row (always second), which should always reflect this row in this event ...
Struct recordSource = source.struct();
assertThat(recordSource.getInt64(SourceInfo.BINLOG_POSITION_OFFSET_KEY)).isEqualTo(positionOfEvent);
assertThat(recordSource.getInt32(SourceInfo.BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY)).isEqualTo(i);
assertThat(recordSource.getInt32(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY)).isEqualTo(row);
assertThat(recordSource.getString(SourceInfo.BINLOG_FILENAME_OFFSET_KEY)).isEqualTo(FILENAME);
if ( source.gtidSet() != null ) {
if (source.gtidSet() != null) {
assertThat(recordSource.getString(SourceInfo.GTID_SET_KEY)).isEqualTo(source.gtidSet());
}
}
source.completeEvent();
}
protected Map<String, String> offset(long position, int row) {
@ -326,7 +412,7 @@ protected Map<String, String> offset(String gtidSet, long position, int row, boo
Map<String, String> offset = new HashMap<>();
offset.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, FILENAME);
offset.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, Long.toString(position));
offset.put(SourceInfo.BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, Integer.toString(row));
offset.put(SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, Integer.toString(row));
if (gtidSet != null) offset.put(SourceInfo.GTID_SET_KEY, gtidSet);
if (snapshot) offset.put(SourceInfo.SNAPSHOT_KEY, Boolean.TRUE.toString());
return offset;
@ -393,12 +479,40 @@ public void shouldOrderPositionsWithSameServerButLowerUpperLimitAsBeforePosition
@Test
public void shouldOrderPositionWithoutGtidAsBeforePositionWithGtid() {
assertPositionWithoutGtids("filename.01", Integer.MAX_VALUE, 0).isBefore(positionWithGtids("IdA:1-5"));
assertPositionWithoutGtids("filename.01", Integer.MAX_VALUE, 0, 0).isBefore(positionWithGtids("IdA:1-5"));
}
@Test
public void shouldOrderPositionWithGtidAsAfterPositionWithoutGtid() {
assertPositionWithGtids("IdA:1-5").isAfter(positionWithoutGtids("filename.01", 0, 0));
assertPositionWithGtids("IdA:1-5").isAfter(positionWithoutGtids("filename.01", 0, 0, 0));
}
@Test
public void shouldComparePositionsWithoutGtids() {
// Same position ...
assertPositionWithoutGtids("fn.01", 1, 0, 0).isAt(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAt(positionWithoutGtids("fn.01", 1, 0, 1));
assertPositionWithoutGtids("fn.03", 1, 0, 1).isAt(positionWithoutGtids("fn.03", 1, 0, 1));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isAt(positionWithoutGtids("fn.01", 1, 1, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAt(positionWithoutGtids("fn.01", 1, 1, 1));
assertPositionWithoutGtids("fn.03", 1, 1, 1).isAt(positionWithoutGtids("fn.03", 1, 1, 1));
// Before position ...
assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 1, 0, 1));
assertPositionWithoutGtids("fn.01", 1, 0, 0).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 1, 0, 2));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 1, 1));
assertPositionWithoutGtids("fn.01", 1, 1, 0).isBefore(positionWithoutGtids("fn.01", 1, 2, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 1, 2, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isBefore(positionWithoutGtids("fn.01", 2, 0, 0));
// After position ...
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99));
assertPositionWithoutGtids("fn.01", 1, 0, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 0, 0, 99));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 0, 0));
assertPositionWithoutGtids("fn.01", 1, 1, 1).isAfter(positionWithoutGtids("fn.01", 1, 1, 0));
}
@FixFor("DBZ-107")
@ -410,21 +524,21 @@ public void shouldRemoveNewlinesFromGtidSet() {
String gtidCleaned = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2," +
"7145bf69-d1ca-11e5-a588-0242ac110004:1-3149," +
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-39";
source.setGtidSet(gtidExecuted);
source.setCompletedGtidSet(gtidExecuted);
assertThat(source.gtidSet()).isEqualTo(gtidCleaned);
}
@FixFor("DBZ-107")
@Test
public void shouldNotSetBlankGtidSet() {
source.setGtidSet("");
source.setCompletedGtidSet("");
assertThat(source.gtidSet()).isNull();
}
@FixFor("DBZ-107")
@Test
public void shouldNotSetNullGtidSet() {
source.setGtidSet(null);
source.setCompletedGtidSet(null);
assertThat(source.gtidSet()).isNull();
}
@ -439,20 +553,22 @@ protected Document positionWithGtids(String gtids, boolean snapshot) {
return Document.create(SourceInfo.GTID_SET_KEY, gtids);
}
protected Document positionWithoutGtids(String filename, int position, int row) {
return positionWithoutGtids(filename, position, row, false);
protected Document positionWithoutGtids(String filename, int position, int event, int row) {
return positionWithoutGtids(filename, position, event, row, false);
}
protected Document positionWithoutGtids(String filename, int position, int row, boolean snapshot) {
protected Document positionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) {
if (snapshot) {
return Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename,
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position,
SourceInfo.BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, row,
SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row,
SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event,
SourceInfo.SNAPSHOT_KEY, true);
}
return Document.create(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, filename,
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position,
SourceInfo.BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY, row);
SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row,
SourceInfo.EVENTS_TO_SKIP_OFFSET_KEY, event);
}
protected PositionAssert assertThatDocument(Document position) {
@ -467,12 +583,12 @@ protected PositionAssert assertPositionWithGtids(String gtids, boolean snapshot)
return assertThatDocument(positionWithGtids(gtids, snapshot));
}
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int row) {
return assertPositionWithoutGtids(filename, position, row, false);
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row) {
return assertPositionWithoutGtids(filename, position, event, row, false);
}
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int row, boolean snapshot) {
return assertThatDocument(positionWithoutGtids(filename, position, row, snapshot));
protected PositionAssert assertPositionWithoutGtids(String filename, int position, int event, int row, boolean snapshot) {
return assertThatDocument(positionWithoutGtids(filename, position, event, row, snapshot));
}
protected static class PositionAssert extends GenericAssert<PositionAssert, Document> {

View File

@ -11,4 +11,5 @@ log4j.rootLogger=INFO, stdout
log4j.logger.io.debezium=INFO
log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
#log4j.logger.io.debezium.connector.mysql.BinlogReader=DEBUG
#log4j.logger.io.debezium.connector.mysql.SnapshotReader=DEBUG
#log4j.logger.io.debezium.connector.mysql.SnapshotReader=DEBUG
#log4j.logger.io.debezium.relational.history=DEBUG

View File

@ -92,6 +92,19 @@ static Document create(CharSequence fieldName1, Object value1, CharSequence fiel
return new BasicDocument().set(fieldName1, value1).set(fieldName2, value2).set(fieldName3, value3).set(fieldName4, value4);
}
static Document create(CharSequence fieldName1, Object value1, CharSequence fieldName2, Object value2, CharSequence fieldName3,
Object value3, CharSequence fieldName4, Object value4, CharSequence fieldName5, Object value5) {
return new BasicDocument().set(fieldName1, value1).set(fieldName2, value2).set(fieldName3, value3).set(fieldName4, value4)
.set(fieldName5, value5);
}
static Document create(CharSequence fieldName1, Object value1, CharSequence fieldName2, Object value2, CharSequence fieldName3,
Object value3, CharSequence fieldName4, Object value4, CharSequence fieldName5, Object value5,
CharSequence fieldName6, Object value6) {
return new BasicDocument().set(fieldName1, value1).set(fieldName2, value2).set(fieldName3, value3).set(fieldName4, value4)
.set(fieldName5, value5).set(fieldName6, value6);
}
/**
* Return the number of name-value fields in this object.
*
@ -159,7 +172,7 @@ default Optional<Value> set(Path path, boolean addIntermediaries, Value value, C
parent = find(parentPath, (missingPath, missingIndex) -> {
invalid.accept(missingPath); // invoke the invalid handler
return Optional.empty();
} , invalid);
}, invalid);
} else {
// Create any missing intermediaries using the segment after the missing segment to determine which
// type of intermediate value to add ...
@ -170,7 +183,7 @@ default Optional<Value> set(Path path, boolean addIntermediaries, Value value, C
} else {
return Optional.of(Value.create(Document.create()));
}
} , invalid);
}, invalid);
}
if (!parent.isPresent()) return Optional.empty();
String lastSegment = path.lastSegment().get();
@ -202,8 +215,7 @@ default Optional<Value> set(Path path, boolean addIntermediaries, Value value, C
* valid
*/
default Optional<Value> find(Path path) {
return find(path, (missingPath, missingIndex) -> Optional.empty(), (invalidPath) -> {
});
return find(path, (missingPath, missingIndex) -> Optional.empty(), (invalidPath) -> {});
}
/**
@ -719,7 +731,7 @@ default Value remove(Optional<? extends CharSequence> name) {
* @return This document, to allow for chaining methods
*/
Document removeAll();
/**
* Sets on this object all name/value pairs from the supplied object. If the supplied object is null, this method does
* nothing.

View File

@ -1169,8 +1169,9 @@ protected Object convertBoolean(Column column, Field fieldDefn, Object data) {
*/
protected Object handleUnknownData(Column column, Field fieldDefn, Object data) {
if (column.isOptional() || fieldDefn.schema().isOptional()) {
Class<?> dataClass = data.getClass();
logger.warn("Unexpected value for JDBC type {} and column {}: class={}", column.jdbcType(), column,
data.getClass()); // don't include value in case its sensitive
dataClass.isArray() ? dataClass.getSimpleName() : dataClass.getName()); // don't include value in case its sensitive
return null;
}
throw new IllegalArgumentException("Unexpected value for JDBC type " + column.jdbcType() + " and column " + column +

View File

@ -46,6 +46,7 @@ public final void record(Map<String, ?> source, Map<String, ?> position, String
@Override
public final void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
logger.debug("Recovering DDL history for source partition {} and offset {}",source,position);
HistoryRecord stopPoint = new HistoryRecord(source, position, null, null);
recoverRecords(schema,ddlParser,recovered->{
if (comparator.isAtOrBefore(recovered,stopPoint)) {
@ -53,7 +54,10 @@ public final void recover(Map<String, ?> source, Map<String, ?> position, Tables
if (ddl != null) {
ddlParser.setCurrentSchema(recovered.databaseName()); // may be null
ddlParser.parse(ddl, schema);
logger.debug("Applying: {}", ddl);
}
} else {
logger.debug("Skipping: {}", recovered.ddl());
}
});
}

View File

@ -281,7 +281,7 @@ public static void debug(SourceRecord record) {
* @param record the record to validate; may not be null
*/
public static void isValid(SourceRecord record) {
print(record);
//print(record);
JsonNode keyJson = null;
JsonNode valueJson = null;

View File

@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@ -35,7 +36,6 @@
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StringConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -134,7 +134,7 @@ public final class EmbeddedEngine implements Runnable {
protected static final Field INTERNAL_KEY_CONVERTER_CLASS = Field.create("internal.key.converter")
.withDescription("The Converter class that should be used to serialize and deserialize key data for offsets.")
.withDefault(StringConverter.class.getName());
.withDefault(JsonConverter.class.getName());
protected static final Field INTERNAL_VALUE_CONVERTER_CLASS = Field.create("internal.value.converter")
.withDescription("The Converter class that should be used to serialize and deserialize value data for offsets.")
@ -159,14 +159,108 @@ public static interface CompletionCallback {
/**
* Handle the completion of the embedded connector engine.
*
* @param success true if the connector completed normally, or {@code false} if the connector produced an error that
* prevented startup or premature termination.
* @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error
* that prevented startup or premature termination.
* @param message the completion message; never null
* @param error the error, or null if there was no exception
*/
void handle(boolean success, String message, Throwable error);
}
/**
* A callback function to be notified when the connector completes.
*/
public static class CompletionResult implements CompletionCallback {
private final CountDownLatch completed = new CountDownLatch(1);
private boolean success;
private String message;
private Throwable error;
@Override
public void handle(boolean success, String message, Throwable error) {
this.success = success;
this.message = message;
this.error = error;
this.completed.countDown();
}
/**
* Causes the current thread to wait until the {@link #handle(boolean, String, Throwable) completion occurs}
* or until the thread is {@linkplain Thread#interrupt interrupted}.
* <p>
* This method returns immediately if the connector has completed already.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void await() throws InterruptedException {
this.completed.await();
}
/**
* Causes the current thread to wait until the {@link #handle(boolean, String, Throwable) completion occurs},
* unless the thread is {@linkplain Thread#interrupt interrupted}, or the specified waiting time elapses.
* <p>
* This method returns immediately if the connector has completed already.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the completion was received, or {@code false} if the waiting time elapsed before the completion
* was received.
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return this.completed.await(timeout, unit);
}
/**
* Determine if the connector has completed.
*
* @return {@code true} if the connector has completed, or {@code false} if the connector is still running and this
* callback has not yet been {@link #handle(boolean, String, Throwable) notified}
*/
public boolean hasCompleted() {
return completed.getCount() == 0;
}
/**
* Get whether the connector completed normally.
*
* @return {@code true} if the connector completed normally, or {@code false} if the connector produced an error that
* prevented startup or premature termination (or the connector has not yet {@link #hasCompleted() completed})
*/
public boolean success() {
return success;
}
/**
* Get the completion message.
*
* @return the completion message, or null if the connector has not yet {@link #hasCompleted() completed}
*/
public String message() {
return message;
}
/**
* Get the completion error, if there is one.
*
* @return the completion error, or null if there is no error or connector has not yet {@link #hasCompleted() completed}
*/
public Throwable error() {
return error;
}
/**
* Determine if there is a completion error.
*
* @return {@code true} if there is a {@link #error completion error}, or {@code false} if there is no error or
* the connector has not yet {@link #hasCompleted() completed}
*/
public boolean hasError() {
return error != null;
}
}
/**
* A builder to set up and create {@link EmbeddedEngine} instances.
*/
@ -295,7 +389,7 @@ public EmbeddedEngine build() {
private long timeSinceLastCommitMillis = 0;
private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,
CompletionCallback completionCallback) {
CompletionCallback completionCallback) {
this.config = config;
this.consumer = consumer;
this.classLoader = classLoader;
@ -308,7 +402,7 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc
assert this.classLoader != null;
assert this.clock != null;
keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), false);
keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
Configuration valueConverterConfig = config;
if (valueConverter instanceof JsonConverter) {
@ -456,8 +550,9 @@ public void raiseError(Exception e) {
}
recordsSinceLastCommit = 0;
Throwable handlerError = null;
timeSinceLastCommitMillis = clock.currentTimeInMillis();
while (runningThread.get() != null) {
while (runningThread.get() != null && handlerError == null) {
try {
logger.debug("Embedded engine is polling task for records");
List<SourceRecord> changeRecords = task.poll(); // blocks until there are values ...
@ -469,17 +564,16 @@ public void raiseError(Exception e) {
try {
consumer.accept(record);
} catch (Throwable t) {
logger.error("Error in the application's handler method, but continuing anyway", t);
handlerError = t;
break;
}
// Record the offset for this record's partition
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
recordsSinceLastCommit += 1;
}
// Only then do we write out the last partition to offset storage ...
SourceRecord lastRecord = changeRecords.get(changeRecords.size() - 1);
lastRecord.sourceOffset();
offsetWriter.offset(lastRecord.sourcePartition(), lastRecord.sourceOffset());
// Flush the offsets to storage if necessary ...
recordsSinceLastCommit += changeRecords.size();
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
} else {
logger.debug("Received no records from the task");
@ -501,7 +595,14 @@ public void raiseError(Exception e) {
} finally {
// Always commit offsets that were captured from the source records we actually processed ...
commitOffsets(offsetWriter, commitTimeoutMs);
succeed("Connector '" + connectorClassName + "' completed normally.");
if (handlerError != null) {
// There was an error in the handler ...
fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(),
handlerError);
} else {
// We stopped normally ...
succeed("Connector '" + connectorClassName + "' completed normally.");
}
}
} catch (Throwable t) {
fail("Error while trying to run connector class '" + connectorClassName + "'", t);

View File

@ -7,8 +7,11 @@
import static org.junit.Assert.fail;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@ -22,6 +25,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
@ -29,11 +33,17 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.fest.assertions.Delta;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
@ -42,8 +52,10 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine.CompletionCallback;
import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig;
import io.debezium.function.BooleanConsumer;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.LoggingContext;
@ -161,6 +173,22 @@ protected int getMaximumEnqueuedRecordCount() {
return 100;
}
/**
* Create a {@link CompletionCallback} that logs when the engine fails to start the connector or when the connector
* stops running after completing successfully or due to an error
*
* @return the logging {@link CompletionCallback}
*/
protected CompletionCallback loggingCompletion() {
return (success, msg, error) -> {
if (success) {
logger.info(msg);
} else {
logger.error(msg, error);
}
};
}
/**
* Start the connector using the supplied connector configuration, where upon completion the status of the connector is
* logged.
@ -169,13 +197,21 @@ protected int getMaximumEnqueuedRecordCount() {
* @param connectorConfig the configuration for the connector; may not be null
*/
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig) {
start(connectorClass, connectorConfig, (success, msg, error) -> {
if (success) {
logger.info(msg);
} else {
logger.error(msg, error);
}
});
start(connectorClass, connectorConfig, loggingCompletion(), null);
}
/**
* Start the connector using the supplied connector configuration, where upon completion the status of the connector is
* logged. The connector will stop immediately when the supplied predicate returns true.
*
* @param connectorClass the connector class; may not be null
* @param connectorConfig the configuration for the connector; may not be null
* @param isStopRecord the function that will be called to determine if the connector should be stopped before processing
* this record; may be null if not needed
*/
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
Predicate<SourceRecord> isStopRecord) {
start(connectorClass, connectorConfig, loggingCompletion(), isStopRecord);
}
/**
@ -186,7 +222,23 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
* @param callback the function that will be called when the engine fails to start the connector or when the connector
* stops running after completing successfully or due to an error; may be null
*/
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, CompletionCallback callback) {
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
CompletionCallback callback) {
start(connectorClass, connectorConfig, callback, null);
}
/**
* Start the connector using the supplied connector configuration.
*
* @param connectorClass the connector class; may not be null
* @param connectorConfig the configuration for the connector; may not be null
* @param isStopRecord the function that will be called to determine if the connector should be stopped before processing
* this record; may be null if not needed
* @param callback the function that will be called when the engine fails to start the connector or when the connector
* stops running after completing successfully or due to an error; may be null
*/
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
CompletionCallback callback, Predicate<SourceRecord> isStopRecord) {
Configuration config = Configuration.copy(connectorConfig)
.with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName())
@ -202,11 +254,14 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
}
Testing.debug("Stopped connector");
};
// Create the connector ...
engine = EmbeddedEngine.create()
.using(config)
.notifying((record) -> {
if (isStopRecord != null && isStopRecord.test(record)) {
logger.error("Stopping connector after record as requested");
throw new ConnectException("Stopping connector after record as requested");
}
try {
consumedLines.put(record);
} catch (InterruptedException e) {
@ -306,7 +361,6 @@ protected SourceRecords consumeRecordsByTopic(int numRecords) throws Interrupted
consumeRecords(numRecords, records::add);
return records;
}
protected class SourceRecords {
private final List<SourceRecord> records = new ArrayList<>();
@ -467,6 +521,55 @@ protected void assertTombstone(SourceRecord record, String pkField, int pk) {
protected void assertTombstone(SourceRecord record) {
VerifyRecord.isValidTombstone(record);
}
protected void assertOffset(SourceRecord record, Map<String,?> expectedOffset) {
Map<String,?> offset = record.sourceOffset();
assertThat(offset).isEqualTo(expectedOffset);
}
protected void assertOffset(SourceRecord record, String offsetField, Object expectedValue) {
Map<String,?> offset = record.sourceOffset();
Object value = offset.get(offsetField);
assertSameValue(value,expectedValue);
}
protected void assertValueField(SourceRecord record, String fieldPath, Object expectedValue) {
Object value = record.value();
String[] fieldNames = fieldPath.split("/");
String pathSoFar = null;
for (int i=0; i!=fieldNames.length; ++i) {
String fieldName = fieldNames[i];
if (value instanceof Struct) {
value = ((Struct)value).get(fieldName);
} else {
// We expected the value to be a struct ...
String path = pathSoFar == null ? "record value" : ("'" + pathSoFar + "'");
String msg = "Expected the " + path + " to be a Struct but was " + value.getClass().getSimpleName() + " in record: " + SchemaUtil.asString(record);
fail(msg);
}
pathSoFar = pathSoFar == null ? fieldName : pathSoFar + "/" + fieldName;
}
assertSameValue(value,expectedValue);
}
private void assertSameValue(Object actual, Object expected) {
if(expected instanceof Double || expected instanceof Float || expected instanceof BigDecimal) {
// Value should be within 1%
double expectedNumericValue = ((Number)expected).doubleValue();
double actualNumericValue = ((Number)actual).doubleValue();
assertThat(actualNumericValue).isEqualTo(expectedNumericValue, Delta.delta(0.01d*expectedNumericValue));
} else if (expected instanceof Integer || expected instanceof Long || expected instanceof Short) {
long expectedNumericValue = ((Number)expected).longValue();
long actualNumericValue = ((Number)actual).longValue();
assertThat(actualNumericValue).isEqualTo(expectedNumericValue);
} else if (expected instanceof Boolean) {
boolean expectedValue = ((Boolean)expected).booleanValue();
boolean actualValue = ((Boolean)expected).booleanValue();
assertThat(actualValue).isEqualTo(expectedValue);
} else {
assertThat(actual).isEqualTo(expected);
}
}
/**
* Assert that the supplied {@link Struct} is {@link Struct#validate() valid} and its {@link Struct#schema() schema}
@ -512,7 +615,8 @@ protected void assertConfigurationErrors(Config config, io.debezium.config.Field
assertThat(value.errorMessages().size()).isEqualTo(numErrors);
}
protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int minErrorsInclusive, int maxErrorsInclusive) {
protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int minErrorsInclusive,
int maxErrorsInclusive) {
ConfigValue value = configValue(config, field.name());
assertThat(value.errorMessages().size()).isGreaterThanOrEqualTo(minErrorsInclusive);
assertThat(value.errorMessages().size()).isLessThanOrEqualTo(maxErrorsInclusive);
@ -526,8 +630,8 @@ protected void assertConfigurationErrors(Config config, io.debezium.config.Field
protected void assertNoConfigurationErrors(Config config, io.debezium.config.Field... fields) {
for (io.debezium.config.Field field : fields) {
ConfigValue value = configValue(config, field.name());
if ( value != null ) {
if ( !value.errorMessages().isEmpty() ) {
if (value != null) {
if (!value.errorMessages().isEmpty()) {
fail("Error messages on field '" + field.name() + "': " + value.errorMessages());
}
}
@ -538,4 +642,59 @@ protected ConfigValue configValue(Config config, String fieldName) {
return config.configValues().stream().filter(value -> value.name().equals(fieldName)).findFirst().orElse(null);
}
/**
* Utility to read the last committed offset for the specified partition.
*
* @param config the configuration of the engine used to persist the offsets
* @param partition the partition
* @return the map of partitions to offsets; never null but possibly empty
*/
protected <T> Map<String, Object> readLastCommittedOffset(Configuration config, Map<String, T> partition) {
return readLastCommittedOffsets(config, Arrays.asList(partition)).get(partition);
}
/**
* Utility to read the last committed offsets for the specified partitions.
*
* @param config the configuration of the engine used to persist the offsets
* @param partitions the partitions
* @return the map of partitions to offsets; never null but possibly empty
*/
protected <T> Map<Map<String, T>, Map<String, Object>> readLastCommittedOffsets(Configuration config,
Collection<Map<String, T>> partitions) {
config = config.edit().with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
final String engineName = config.getString(EmbeddedEngine.ENGINE_NAME);
Converter keyConverter = config.getInstance(EmbeddedEngine.INTERNAL_KEY_CONVERTER_CLASS, Converter.class);
keyConverter.configure(config.subset(EmbeddedEngine.INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
Converter valueConverter = config.getInstance(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS, Converter.class);
Configuration valueConverterConfig = config;
if (valueConverter instanceof JsonConverter) {
// Make sure that the JSON converter is configured to NOT enable schemas ...
valueConverterConfig = config.edit().with(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();
}
valueConverter.configure(valueConverterConfig.subset(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(),
false);
// Create the worker config, adding extra fields that are required for validation of a worker config
// but that are not used within the embedded engine (since the source records are never serialized) ...
Map<String, String> embeddedConfig = config.asMap(EmbeddedEngine.ALL_FIELDS);
embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
WorkerConfig workerConfig = new EmbeddedConfig(embeddedConfig);
FileOffsetBackingStore offsetStore = new FileOffsetBackingStore();
offsetStore.configure(workerConfig);
offsetStore.start();
try {
OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, keyConverter, valueConverter);
return offsetReader.offsets(partitions);
} finally {
offsetStore.stop();
}
}
}