From d7e52165beaa347d87a9095aaa0a10be2e8b2c7c Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Mon, 25 Jan 2016 18:12:49 -0600 Subject: [PATCH] Changed MySQL binlog reading integration test to eliminate concurrent issues. --- .../debezium/mysql/ingest/ReadBinLogIT.java | 240 +++++++++++------- 1 file changed, 145 insertions(+), 95 deletions(-) diff --git a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/ReadBinLogIT.java b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/ReadBinLogIT.java index 97714a809..36f7d45ca 100644 --- a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/ReadBinLogIT.java +++ b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/ReadBinLogIT.java @@ -15,19 +15,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; -import org.junit.FixMethodOrder; import org.junit.Test; -import org.junit.runners.MethodSorters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +34,7 @@ import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.EventHeader; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.TableMapEventData; @@ -50,7 +48,6 @@ import io.debezium.jdbc.TestDatabase; import io.debezium.mysql.MySQLConnection; -@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ReadBinLogIT { protected static final Logger LOGGER = LoggerFactory.getLogger(ReadBinLogIT.class); @@ -63,7 +60,7 @@ private static final class AnyValue implements Serializable { private static final Serializable ANY_OBJECT = new AnyValue(); private JdbcConfiguration config; - private EventCounters counters; + private EventQueue counters; private BinaryLogClient client; private MySQLConnection conn; private List events = new ArrayList<>(); @@ -79,11 +76,10 @@ public void beforeEach() throws TimeoutException, IOException, SQLException, Int conn.connect(); // Connect the bin log client ... - counters = new EventCounters(); + counters = new EventQueue(DEFAULT_TIMEOUT, this::logConsumedEvent, this::logIgnoredEvent); client = new BinaryLogClient(config.getHostname(), config.getPort(), "replicator", "replpass"); client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances client.setKeepAlive(false); - client.registerEventListener(this::logEvent); client.registerEventListener(counters); client.registerEventListener(this::recordEvent); client.registerLifecycleListener(new TraceLifecycleListener()); @@ -98,7 +94,7 @@ public void beforeEach() throws TimeoutException, IOException, SQLException, Int " updatedAt DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" + ")"); - counters.waitFor(2, EventType.QUERY, DEFAULT_TIMEOUT); + counters.consume(2, EventType.QUERY); counters.reset(); } @@ -121,20 +117,20 @@ public void afterEach() throws IOException, SQLException { public void shouldCaptureSingleWriteUpdateDeleteEvents() throws Exception { // write/insert conn.execute("INSERT INTO person(name,age) VALUES ('Georgia',30)"); - counters.waitFor(1, WriteRowsEventData.class, DEFAULT_TIMEOUT); + counters.consume(1, WriteRowsEventData.class); List writeRowEvents = recordedEventData(WriteRowsEventData.class, 1); assertRows(writeRowEvents.get(0), rows().insertedRow("Georgia", 30, any(), any())); // update conn.execute("UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'"); - counters.waitFor(1, UpdateRowsEventData.class, DEFAULT_TIMEOUT); + counters.consume(1, UpdateRowsEventData.class); List updateRowEvents = recordedEventData(UpdateRowsEventData.class, 1); assertRows(updateRowEvents.get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any())); // delete conn.execute("DELETE FROM person WHERE name = 'Maggie'"); - counters.waitFor(1, DeleteRowsEventData.class, DEFAULT_TIMEOUT); + counters.consume(1, DeleteRowsEventData.class); List deleteRowEvents = recordedEventData(DeleteRowsEventData.class, 1); assertRows(deleteRowEvents.get(0), rows().removedRow("Maggie", 30, any(), any())); } @@ -144,10 +140,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception { // write/insert as a single transaction conn.execute("INSERT INTO person(name,age) VALUES ('Georgia',30)", "INSERT INTO person(name,age) VALUES ('Janice',19)"); - counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN - counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(2, WriteRowsEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT + counters.consume(1, QueryEventData.class); // BEGIN + counters.consume(1, TableMapEventData.class); + counters.consume(2, WriteRowsEventData.class); + counters.consume(1, XidEventData.class); // COMMIT List writeRowEvents = recordedEventData(WriteRowsEventData.class, 2); assertRows(writeRowEvents.get(0), rows().insertedRow("Georgia", 30, any(), any())); assertRows(writeRowEvents.get(1), rows().insertedRow("Janice", 19, any(), any())); @@ -156,10 +152,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception { // update as a single transaction conn.execute("UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'", "UPDATE person SET name = 'Jamie' WHERE name = 'Janice'"); - counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN - counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(2, UpdateRowsEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT + counters.consume(1, QueryEventData.class); // BEGIN + counters.consume(1, TableMapEventData.class); + counters.consume(2, UpdateRowsEventData.class); + counters.consume(1, XidEventData.class); // COMMIT List updateRowEvents = recordedEventData(UpdateRowsEventData.class, 2); assertRows(updateRowEvents.get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any())); assertRows(updateRowEvents.get(1), rows().changeRow("Janice", 19, any(), any()).to("Jamie", 19, any(), any())); @@ -168,10 +164,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception { // delete as a single transaction conn.execute("DELETE FROM person WHERE name = 'Maggie'", "DELETE FROM person WHERE name = 'Jamie'"); - counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN - counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(2, DeleteRowsEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT + counters.consume(1, QueryEventData.class); // BEGIN + counters.consume(1, TableMapEventData.class); + counters.consume(2, DeleteRowsEventData.class); + counters.consume(1, XidEventData.class); // COMMIT List deleteRowEvents = recordedEventData(DeleteRowsEventData.class, 2); assertRows(deleteRowEvents.get(0), rows().removedRow("Maggie", 30, any(), any())); assertRows(deleteRowEvents.get(1), rows().removedRow("Jamie", 19, any(), any())); @@ -181,10 +177,10 @@ public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception { public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Exception { // write/insert as a single statement/transaction conn.execute("INSERT INTO person(name,age) VALUES ('Georgia',30),('Janice',19)"); - counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN - counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, WriteRowsEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT + counters.consume(1, QueryEventData.class); // BEGIN + counters.consume(1, TableMapEventData.class); + counters.consume(1, WriteRowsEventData.class); + counters.consume(1, XidEventData.class); // COMMIT List writeRowEvents = recordedEventData(WriteRowsEventData.class, 1); assertRows(writeRowEvents.get(0), rows().insertedRow("Georgia", 30, any(), any()) .insertedRow("Janice", 19, any(), any())); @@ -196,10 +192,10 @@ public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Excep " WHEN name = 'Janice' THEN 'Jamie' " + " END " + "WHERE name IN ('Georgia','Janice')"); - counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN - counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, UpdateRowsEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT + counters.consume(1, QueryEventData.class); // BEGIN + counters.consume(1, TableMapEventData.class); + counters.consume(1, UpdateRowsEventData.class); + counters.consume(1, XidEventData.class); // COMMIT List updateRowEvents = recordedEventData(UpdateRowsEventData.class, 1); assertRows(updateRowEvents.get(0), rows().changeRow("Georgia", 30, any(), any()).to("Maggie", 30, any(), any()) .changeRow("Janice", 19, any(), any()).to("Jamie", 19, any(), any())); @@ -207,10 +203,10 @@ public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Excep // delete as a single statement/transaction conn.execute("DELETE FROM person WHERE name IN ('Maggie','Jamie')"); - counters.waitFor(1, QueryEventData.class, DEFAULT_TIMEOUT); // BEGIN - counters.waitFor(1, TableMapEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, DeleteRowsEventData.class, DEFAULT_TIMEOUT); - counters.waitFor(1, XidEventData.class, DEFAULT_TIMEOUT); // COMMIT + counters.consume(1, QueryEventData.class); // BEGIN + counters.consume(1, TableMapEventData.class); + counters.consume(1, DeleteRowsEventData.class); + counters.consume(1, XidEventData.class); // COMMIT List deleteRowEvents = recordedEventData(DeleteRowsEventData.class, 1); assertRows(deleteRowEvents.get(0), rows().removedRow("Maggie", 30, any(), any()) .removedRow("Jamie", 19, any(), any())); @@ -233,8 +229,12 @@ public void shouldQueryInformationSchema() throws Exception { } - protected void logEvent(Event event) { - LOGGER.info("Received event: " + event); + protected void logConsumedEvent(Event event) { + LOGGER.info("Consumed event: " + event); + } + + protected void logIgnoredEvent(Event event) { + LOGGER.info("Ignored event: " + event); } protected void recordEvent(Event event) { @@ -402,89 +402,139 @@ protected void assertRows(DeleteRowsEventData eventData, RowBuilder rows) { } } - protected static class EventCounters implements EventListener { - /* - * VariableLatch instances count down when receiving an event, and thus are negative. When callers wait for a specified - * number of events to occur, the latch's count is incremented by the expected count. If the latch's resulting count is - * less than or equal to 0, then the caller does not wait. - */ - private final ConcurrentMap counterByType = new ConcurrentHashMap<>(); - private final ConcurrentMap, AtomicInteger> counterByDataClass = new ConcurrentHashMap<>(); + protected static class EventQueue implements EventListener { + + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final Consumer consumedEvents; + private final Consumer ignoredEvents; + private final long defaultTimeoutInMillis; + + public EventQueue(long defaultTimeoutInMillis, Consumer consumedEvents, Consumer ignoredEvents) { + this.defaultTimeoutInMillis = defaultTimeoutInMillis; + this.consumedEvents = consumedEvents != null ? consumedEvents : this::defaultEventHandler; + this.ignoredEvents = ignoredEvents != null ? ignoredEvents : this::defaultEventHandler; + } + + private void defaultEventHandler(Event event) { + } @Override public void onEvent(Event event) { - counterByType.compute(event.getHeader().getEventType(), this::increment); - EventData data = event.getData(); - if (data != null) { - counterByDataClass.compute(data.getClass(), this::increment); - } + queue.add(event); } - protected AtomicInteger increment(K key, AtomicInteger counter) { - if (counter == null) return new AtomicInteger(1); - synchronized (counter) { - counter.incrementAndGet(); - counter.notify(); - } - return counter; + protected Event consume() { + return queue.poll(); } /** - * Blocks until the listener has seen the specified number of events with the given type. + * Blocks until the listener has consume the specified number of matching events, blocking at most the default number of + * milliseconds. If this method has not reached the number of matching events and comes across events that do not satisfy + * the predicate, those events are consumed and ignored. + * + * @param eventCount the number of events + * @param condition the event-based predicate that signals a match; may not be null + * @throws TimeoutException if the waiting timed out before the expected number of events were received + */ + public void consume(int eventCount, Predicate condition) throws TimeoutException { + consume(eventCount, defaultTimeoutInMillis, condition); + } + + /** + * Blocks until the listener has consume the specified number of matching events, blocking at most the specified number + * of milliseconds. If this method has not reached the number of matching events and comes across events that do not + * satisfy the predicate, those events are consumed and ignored. + * + * @param eventCount the number of events + * @param timeoutInMillis the maximum amount of time in milliseconds that this method should block + * @param condition the event-based predicate that signals a match; may not be null + * @throws TimeoutException if the waiting timed out before the expected number of events were received + */ + public void consume(int eventCount, long timeoutInMillis, Predicate condition) + throws TimeoutException { + if (eventCount < 0) throw new IllegalArgumentException("The eventCount may not be negative"); + if (eventCount == 0) return; + int eventsRemaining = eventCount; + final long stopTime = System.currentTimeMillis() + timeoutInMillis; + while (eventsRemaining > 0 && System.currentTimeMillis() < stopTime) { + Event nextEvent = queue.poll(); + if (nextEvent != null) { + if (condition.test(nextEvent)) { + --eventsRemaining; + consumedEvents.accept(nextEvent); + } else { + ignoredEvents.accept(nextEvent); + } + } + } + if (eventsRemaining > 0) { + throw new TimeoutException( + "Received " + (eventCount - eventsRemaining) + " of " + eventCount + " in " + timeoutInMillis + "ms"); + } + } + + /** + * Blocks until the listener has seen the specified number of events with the given type, or until the default timeout + * has passed. + * + * @param eventCount the number of events + * @param type the type of event + * @throws TimeoutException if the waiting timed out before the expected number of events were received + */ + public void consume(int eventCount, EventType type) throws TimeoutException { + consume(eventCount, type, defaultTimeoutInMillis); + } + + /** + * Blocks until the listener has seen the specified number of events with the given type, or until the specified time + * has passed. * * @param eventCount the number of events * @param type the type of event * @param timeoutMillis the maximum amount of time in milliseconds that this method should block - * @throws InterruptedException if the thread was interrupted while waiting * @throws TimeoutException if the waiting timed out before the expected number of events were received */ - public void waitFor(int eventCount, EventType type, long timeoutMillis) throws InterruptedException, TimeoutException { - waitFor(type.name(), () -> counterByType.get(type), eventCount, timeoutMillis); + public void consume(int eventCount, EventType type, long timeoutMillis) throws TimeoutException { + consume(eventCount, defaultTimeoutInMillis, event -> { + EventHeader header = event.getHeader(); + EventType eventType = header == null ? null : header.getEventType(); + return type.equals(eventType); + }); } /** - * Blocks until the listener has seen the specified number of events with the given type. + * Blocks until the listener has seen the specified number of events with the given type, or until the default timeout + * has passed. + * + * @param eventCount the number of events + * @param eventDataClass the EventData subclass + * @throws TimeoutException if the waiting timed out before the expected number of events were received + */ + public void consume(int eventCount, Class eventDataClass) throws TimeoutException { + consume(eventCount, eventDataClass, defaultTimeoutInMillis); + } + + /** + * Blocks until the listener has seen the specified number of events with event data matching the specified class, + * or until the specified time has passed. * * @param eventCount the number of events * @param eventDataClass the EventData subclass * @param timeoutMillis the maximum amount of time in milliseconds that this method should block - * @throws InterruptedException if the thread was interrupted while waiting * @throws TimeoutException if the waiting timed out before the expected number of events were received */ - public void waitFor(int eventCount, Class eventDataClass, long timeoutMillis) - throws InterruptedException, TimeoutException { - waitFor(eventDataClass.getSimpleName(), () -> counterByDataClass.get(eventDataClass), eventCount, timeoutMillis); - } - - private void waitFor(String eventTypeName, Supplier counterGetter, int eventCount, long timeoutMillis) - throws InterruptedException, TimeoutException { - // Get the counter, and prepare for it to be null ... - AtomicInteger counter = null; - long stopTime = System.currentTimeMillis() + timeoutMillis; - do { - counter = counterGetter.get(); - } while (counter == null && System.currentTimeMillis() <= stopTime); - if (counter == null) { - // Did not even find a counter in this timeframe ... - throw new TimeoutException("Timed out while waiting for " + eventCount + " " + eventTypeName + " events"); - } - synchronized (counter) { - counter.addAndGet(-eventCount); - if (counter.get() != 0) { - counter.wait(timeoutMillis); - if (counter.get() != 0) { - throw new TimeoutException("Timed out while waiting for " + eventCount + " " + eventTypeName + " events"); - } - } - } + public void consume(int eventCount, Class eventDataClass, long timeoutMillis) throws TimeoutException { + consume(eventCount, defaultTimeoutInMillis, event -> { + EventData data = event.getData(); + return data != null && data.getClass().equals(eventDataClass); + }); } /** - * Clear all counters. + * Clear the queue. */ public void reset() { - counterByDataClass.clear(); - counterByType.clear(); + queue.clear(); } }