Changed MySQL binlog reading integration test to eliminate concurrent issues.

This commit is contained in:
Randall Hauch 2016-01-25 18:12:49 -06:00
parent 4ddd4b33be
commit d7e52165be

View File

@ -15,19 +15,16 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,6 +34,7 @@
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@ -50,7 +48,6 @@
import io.debezium.jdbc.TestDatabase;
import io.debezium.mysql.MySQLConnection;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ReadBinLogIT {
protected static final Logger LOGGER = LoggerFactory.getLogger(ReadBinLogIT.class);
@ -63,7 +60,7 @@ private static final class AnyValue implements Serializable {
private static final Serializable ANY_OBJECT = new AnyValue();
private JdbcConfiguration config;
private EventCounters counters;
private EventQueue counters;
private BinaryLogClient client;
private MySQLConnection conn;
private List<Event> events = new ArrayList<>();
@ -79,11 +76,10 @@ public void beforeEach() throws TimeoutException, IOException, SQLException, Int
conn.connect();
// Connect the bin log client ...
counters = new EventCounters();
counters = new EventQueue(DEFAULT_TIMEOUT, this::logConsumedEvent, this::logIgnoredEvent);
client = new BinaryLogClient(config.getHostname(), config.getPort(), "replicator", "replpass");
client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances
client.setKeepAlive(false);
client.registerEventListener(this::logEvent);
client.registerEventListener(counters);
client.registerEventListener(this::recordEvent);
client.registerLifecycleListener(new TraceLifecycleListener());
@ -98,7 +94,7 @@ public void beforeEach() throws TimeoutException, IOException, SQLException, Int
" updatedAt DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" +
")");
counters.waitFor(2, EventType.QUERY, DEFAULT_TIMEOUT);
counters.consume(2, EventType.QUERY);
counters.reset();
}
@ -121,20 +117,20 @@ public void afterEach() throws IOException, SQLException {
public void shouldCaptureSingleWriteUpdateDeleteEvents() throws Exception {
// write/insert
conn.execute("INSERT INTO person(name,age) VALUES ('Georgia',30)");
counters.waitFor(1, WriteRowsEventData.class, DEFAULT_TIMEOUT);
counters.consume(1, WriteRowsEventData.class);
List<WriteRowsEventData> writeRowEvents = recordedEventData(WriteRowsEventData.class, 1);
assertRows(writeRowEvents.get(0), rows().insertedRow("Georgia", 30, any(), any()));
// update
conn.execute("UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'");
counters.waitFor(1, UpdateRowsEventData.class, DEFAULT_TIMEOUT);
counters.consume(1, UpdateRowsEventData.class);
List<UpdateRowsEventData> updateRowEvents = recordedEventData(UpdateRowsEventData.class, 1);
assertRows(updateRowEvents.get(0),
rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any()));
// delete
conn.execute("DELETE FROM person WHERE name = 'Maggie'");
counters.waitFor(1, DeleteRowsEventData.class, DEFAULT_TIMEOUT);
counters.consume(1, DeleteRowsEventData.class);
List<DeleteRowsEventData> deleteRowEvents = recordedEventData(DeleteRowsEventData.class, 1);
assertRows(deleteRowEvents.get(0), rows().removedRow("Maggie", 30, any(), any()));
}
@ -144,10 +140,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception {
// write/insert as a single transaction
conn.execute("INSERT INTO person(name,age) VALUES ('Georgia',30)",
"INSERT INTO person(name,age) VALUES ('Janice',19)");
counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN
counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(2, WriteRowsEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT
counters.consume(1, QueryEventData.class); // BEGIN
counters.consume(1, TableMapEventData.class);
counters.consume(2, WriteRowsEventData.class);
counters.consume(1, XidEventData.class); // COMMIT
List<WriteRowsEventData> writeRowEvents = recordedEventData(WriteRowsEventData.class, 2);
assertRows(writeRowEvents.get(0), rows().insertedRow("Georgia", 30, any(), any()));
assertRows(writeRowEvents.get(1), rows().insertedRow("Janice", 19, any(), any()));
@ -156,10 +152,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception {
// update as a single transaction
conn.execute("UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'",
"UPDATE person SET name = 'Jamie' WHERE name = 'Janice'");
counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN
counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(2, UpdateRowsEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT
counters.consume(1, QueryEventData.class); // BEGIN
counters.consume(1, TableMapEventData.class);
counters.consume(2, UpdateRowsEventData.class);
counters.consume(1, XidEventData.class); // COMMIT
List<UpdateRowsEventData> updateRowEvents = recordedEventData(UpdateRowsEventData.class, 2);
assertRows(updateRowEvents.get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any()));
assertRows(updateRowEvents.get(1), rows().changeRow("Janice", 19, any(), any()).to("Jamie", 19, any(), any()));
@ -168,10 +164,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception {
// delete as a single transaction
conn.execute("DELETE FROM person WHERE name = 'Maggie'",
"DELETE FROM person WHERE name = 'Jamie'");
counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN
counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(2, DeleteRowsEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT
counters.consume(1, QueryEventData.class); // BEGIN
counters.consume(1, TableMapEventData.class);
counters.consume(2, DeleteRowsEventData.class);
counters.consume(1, XidEventData.class); // COMMIT
List<DeleteRowsEventData> deleteRowEvents = recordedEventData(DeleteRowsEventData.class, 2);
assertRows(deleteRowEvents.get(0), rows().removedRow("Maggie", 30, any(), any()));
assertRows(deleteRowEvents.get(1), rows().removedRow("Jamie", 19, any(), any()));
@ -181,10 +177,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception {
public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Exception {
// write/insert as a single statement/transaction
conn.execute("INSERT INTO person(name,age) VALUES ('Georgia',30),('Janice',19)");
counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN
counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, WriteRowsEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT
counters.consume(1, QueryEventData.class); // BEGIN
counters.consume(1, TableMapEventData.class);
counters.consume(1, WriteRowsEventData.class);
counters.consume(1, XidEventData.class); // COMMIT
List<WriteRowsEventData> writeRowEvents = recordedEventData(WriteRowsEventData.class, 1);
assertRows(writeRowEvents.get(0), rows().insertedRow("Georgia", 30, any(), any())
.insertedRow("Janice", 19, any(), any()));
@ -196,10 +192,10 @@ public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Excep
" WHEN name = 'Janice' THEN 'Jamie' " +
" END " +
"WHERE name IN ('Georgia','Janice')");
counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN
counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, UpdateRowsEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT
counters.consume(1, QueryEventData.class); // BEGIN
counters.consume(1, TableMapEventData.class);
counters.consume(1, UpdateRowsEventData.class);
counters.consume(1, XidEventData.class); // COMMIT
List<UpdateRowsEventData> updateRowEvents = recordedEventData(UpdateRowsEventData.class, 1);
assertRows(updateRowEvents.get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any())
.changeRow("Janice", 19, any(), any()).to("Jamie", 19, any(), any()));
@ -207,10 +203,10 @@ public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Excep
// delete as a single statement/transaction
conn.execute("DELETE FROM person WHERE name IN ('Maggie','Jamie')");
counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN
counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, DeleteRowsEventData.class, DEFAULT_TIMEOUT);
counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT
counters.consume(1, QueryEventData.class); // BEGIN
counters.consume(1, TableMapEventData.class);
counters.consume(1, DeleteRowsEventData.class);
counters.consume(1, XidEventData.class); // COMMIT
List<DeleteRowsEventData> deleteRowEvents = recordedEventData(DeleteRowsEventData.class, 1);
assertRows(deleteRowEvents.get(0), rows().removedRow("Maggie", 30, any(), any())
.removedRow("Jamie", 19, any(), any()));
@ -233,8 +229,12 @@ public void shouldQueryInformationSchema() throws Exception {
}
protected void logEvent(Event event) {
LOGGER.info("Received event: " + event);
protected void logConsumedEvent(Event event) {
LOGGER.info("Consumed event: " + event);
}
protected void logIgnoredEvent(Event event) {
LOGGER.info("Ignored event: " + event);
}
protected void recordEvent(Event event) {
@ -402,89 +402,139 @@ protected void assertRows(DeleteRowsEventData eventData, RowBuilder rows) {
}
}
protected static class EventCounters implements EventListener {
/*
* VariableLatch instances count down when receiving an event, and thus are negative. When callers wait for a specified
* number of events to occur, the latch's count is incremented by the expected count. If the latch's resulting count is
* less than or equal to 0, then the caller does not wait.
*/
private final ConcurrentMap<EventType, AtomicInteger> counterByType = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<? extends EventData>, AtomicInteger> counterByDataClass = new ConcurrentHashMap<>();
protected static class EventQueue implements EventListener {
private final ConcurrentLinkedQueue<Event> queue = new ConcurrentLinkedQueue<>();
private final Consumer<Event> consumedEvents;
private final Consumer<Event> ignoredEvents;
private final long defaultTimeoutInMillis;
public EventQueue(long defaultTimeoutInMillis, Consumer<Event> consumedEvents, Consumer<Event> ignoredEvents) {
this.defaultTimeoutInMillis = defaultTimeoutInMillis;
this.consumedEvents = consumedEvents != null ? consumedEvents : this::defaultEventHandler;
this.ignoredEvents = ignoredEvents != null ? ignoredEvents : this::defaultEventHandler;
}
private void defaultEventHandler(Event event) {
}
@Override
public void onEvent(Event event) {
counterByType.compute(event.getHeader().getEventType(), this::increment);
EventData data = event.getData();
if (data != null) {
counterByDataClass.compute(data.getClass(), this::increment);
}
queue.add(event);
}
protected <K> AtomicInteger increment(K key, AtomicInteger counter) {
if (counter == null) return new AtomicInteger(1);
synchronized (counter) {
counter.incrementAndGet();
counter.notify();
}
return counter;
protected Event consume() {
return queue.poll();
}
/**
* Blocks until the listener has seen the specified number of events with the given type.
* Blocks until the listener has consume the specified number of matching events, blocking at most the default number of
* milliseconds. If this method has not reached the number of matching events and comes across events that do not satisfy
* the predicate, those events are consumed and ignored.
*
* @param eventCount the number of events
* @param condition the event-based predicate that signals a match; may not be null
* @throws TimeoutException if the waiting timed out before the expected number of events were received
*/
public void consume(int eventCount, Predicate<Event> condition) throws TimeoutException {
consume(eventCount, defaultTimeoutInMillis, condition);
}
/**
* Blocks until the listener has consume the specified number of matching events, blocking at most the specified number
* of milliseconds. If this method has not reached the number of matching events and comes across events that do not
* satisfy the predicate, those events are consumed and ignored.
*
* @param eventCount the number of events
* @param timeoutInMillis the maximum amount of time in milliseconds that this method should block
* @param condition the event-based predicate that signals a match; may not be null
* @throws TimeoutException if the waiting timed out before the expected number of events were received
*/
public void consume(int eventCount, long timeoutInMillis, Predicate<Event> condition)
throws TimeoutException {
if (eventCount < 0) throw new IllegalArgumentException("The eventCount may not be negative");
if (eventCount == 0) return;
int eventsRemaining = eventCount;
final long stopTime = System.currentTimeMillis() + timeoutInMillis;
while (eventsRemaining > 0 && System.currentTimeMillis() < stopTime) {
Event nextEvent = queue.poll();
if (nextEvent != null) {
if (condition.test(nextEvent)) {
--eventsRemaining;
consumedEvents.accept(nextEvent);
} else {
ignoredEvents.accept(nextEvent);
}
}
}
if (eventsRemaining > 0) {
throw new TimeoutException(
"Received " + (eventCount - eventsRemaining) + " of " + eventCount + " in " + timeoutInMillis + "ms");
}
}
/**
* Blocks until the listener has seen the specified number of events with the given type, or until the default timeout
* has passed.
*
* @param eventCount the number of events
* @param type the type of event
* @throws TimeoutException if the waiting timed out before the expected number of events were received
*/
public void consume(int eventCount, EventType type) throws TimeoutException {
consume(eventCount, type, defaultTimeoutInMillis);
}
/**
* Blocks until the listener has seen the specified number of events with the given type, or until the specified time
* has passed.
*
* @param eventCount the number of events
* @param type the type of event
* @param timeoutMillis the maximum amount of time in milliseconds that this method should block
* @throws InterruptedException if the thread was interrupted while waiting
* @throws TimeoutException if the waiting timed out before the expected number of events were received
*/
public void waitFor(int eventCount, EventType type, long timeoutMillis) throws InterruptedException, TimeoutException {
waitFor(type.name(), () -> counterByType.get(type), eventCount, timeoutMillis);
public void consume(int eventCount, EventType type, long timeoutMillis) throws TimeoutException {
consume(eventCount, defaultTimeoutInMillis, event -> {
EventHeader header = event.getHeader();
EventType eventType = header == null ? null : header.getEventType();
return type.equals(eventType);
});
}
/**
* Blocks until the listener has seen the specified number of events with the given type.
* Blocks until the listener has seen the specified number of events with the given type, or until the default timeout
* has passed.
*
* @param eventCount the number of events
* @param eventDataClass the EventData subclass
* @throws TimeoutException if the waiting timed out before the expected number of events were received
*/
public void consume(int eventCount, Class<? extends EventData> eventDataClass) throws TimeoutException {
consume(eventCount, eventDataClass, defaultTimeoutInMillis);
}
/**
* Blocks until the listener has seen the specified number of events with event data matching the specified class,
* or until the specified time has passed.
*
* @param eventCount the number of events
* @param eventDataClass the EventData subclass
* @param timeoutMillis the maximum amount of time in milliseconds that this method should block
* @throws InterruptedException if the thread was interrupted while waiting
* @throws TimeoutException if the waiting timed out before the expected number of events were received
*/
public void waitFor(int eventCount, Class<? extends EventData> eventDataClass, long timeoutMillis)
throws InterruptedException, TimeoutException {
waitFor(eventDataClass.getSimpleName(), () -> counterByDataClass.get(eventDataClass), eventCount, timeoutMillis);
}
private void waitFor(String eventTypeName, Supplier<AtomicInteger> counterGetter, int eventCount, long timeoutMillis)
throws InterruptedException, TimeoutException {
// Get the counter, and prepare for it to be null ...
AtomicInteger counter = null;
long stopTime = System.currentTimeMillis() + timeoutMillis;
do {
counter = counterGetter.get();
} while (counter == null && System.currentTimeMillis() <= stopTime);
if (counter == null) {
// Did not even find a counter in this timeframe ...
throw new TimeoutException("Timed out while waiting for " + eventCount + " " + eventTypeName + " events");
}
synchronized (counter) {
counter.addAndGet(-eventCount);
if (counter.get() != 0) {
counter.wait(timeoutMillis);
if (counter.get() != 0) {
throw new TimeoutException("Timed out while waiting for " + eventCount + " " + eventTypeName + " events");
}
}
}
public void consume(int eventCount, Class<? extends EventData> eventDataClass, long timeoutMillis) throws TimeoutException {
consume(eventCount, defaultTimeoutInMillis, event -> {
EventData data = event.getData();
return data != null && data.getClass().equals(eventDataClass);
});
}
/**
* Clear all counters.
* Clear the queue.
*/
public void reset() {
counterByDataClass.clear();
counterByType.clear();
queue.clear();
}
}