DBZ-129 Fix for GTID updates

Workaround for https://github.com/shyiko/mysql-binlog-connector-java/issues/122.
This commit is contained in:
Randall Hauch 2016-10-14 14:42:46 -05:00
parent 7387654bfa
commit 2f5772712a
2 changed files with 12 additions and 5 deletions

View File

@ -72,6 +72,7 @@ public class BinlogReader extends AbstractReader {
private long previousOutputMillis = 0L; private long previousOutputMillis = 0L;
private final AtomicLong totalRecordCounter = new AtomicLong(); private final AtomicLong totalRecordCounter = new AtomicLong();
private volatile Map<String, ?> lastOffset = null; private volatile Map<String, ?> lastOffset = null;
private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
/** /**
* Create a binlog reader. * Create a binlog reader.
@ -142,7 +143,6 @@ protected void doStart() {
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent); eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata); eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent); eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
eventHandlers.put(EventType.GTID, this::handleGtidEvent);
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert); eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate); eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete); eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
@ -154,6 +154,9 @@ protected void doStart() {
// set set the client to start from that point ... // set set the client to start from that point ...
String gtidSetStr = source.gtidSet(); String gtidSetStr = source.gtidSet();
if (gtidSetStr != null) { if (gtidSetStr != null) {
// Register the event handler ...
eventHandlers.put(EventType.GTID, this::handleGtidEvent);
logger.info("GTID set from previous recorded offset: {}", gtidSetStr); logger.info("GTID set from previous recorded offset: {}", gtidSetStr);
// Remove any of the GTID sources that are not required/acceptable ... // Remove any of the GTID sources that are not required/acceptable ...
Predicate<String> gtidSourceFilter = context.gtidSourceFilter(); Predicate<String> gtidSourceFilter = context.gtidSourceFilter();
@ -164,9 +167,11 @@ protected void doStart() {
source.setGtidSet(gtidSetStr); source.setGtidSet(gtidSetStr);
} }
client.setGtidSet(gtidSetStr); client.setGtidSet(gtidSetStr);
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(gtidSetStr);
} else { } else {
client.setBinlogFilename(source.binlogFilename()); client.setBinlogFilename(source.binlogFilename());
client.setBinlogPosition(source.nextBinlogPosition()); client.setBinlogPosition(source.nextBinlogPosition());
gtidSet = null;
} }
// Set the starting row number, which is the next row number to be read ... // Set the starting row number, which is the next row number to be read ...
@ -357,8 +362,10 @@ protected void handleRotateLogsEvent(Event event) {
protected void handleGtidEvent(Event event) { protected void handleGtidEvent(Event event) {
logger.debug("GTID transaction: {}", event); logger.debug("GTID transaction: {}", event);
GtidEventData gtidEvent = unwrapData(event); GtidEventData gtidEvent = unwrapData(event);
source.setGtid(gtidEvent.getGtid()); String gtid = gtidEvent.getGtid();
source.setGtidSet(client.getGtidSet()); gtidSet.add(gtid);
source.setGtid(gtid);
source.setGtidSet(gtidSet.toString()); // rather than use the client's GTID set
} }
/** /**

View File

@ -262,7 +262,7 @@ public void setGtid(String gtid) {
*/ */
public void setGtidSet(String gtidSet) { public void setGtidSet(String gtidSet) {
if (gtidSet != null && !gtidSet.trim().isEmpty()) { if (gtidSet != null && !gtidSet.trim().isEmpty()) {
this.gtidSet = gtidSet.replaceAll("\n", ""); // remove all of the newline chars if they exist this.gtidSet = gtidSet.replaceAll("\n", "").replaceAll("\r", ""); // remove all of the newline chars if they exist
} }
} }
@ -385,7 +385,7 @@ private boolean booleanOffsetValue(Map<String, ?> values, String key) {
* @return the string representation of the binlog GTID ranges; may be null * @return the string representation of the binlog GTID ranges; may be null
*/ */
public String gtidSet() { public String gtidSet() {
return this.gtidSet != null ? this.gtidSet.toString() : null; return this.gtidSet != null ? this.gtidSet : null;
} }
/** /**