Merge pull request #121 from rhauch/dbz-134

DBZ-134 Enabled JMX metrics for MySQL connector
This commit is contained in:
Randall Hauch 2016-10-19 16:58:24 -05:00 committed by GitHub
commit 2000765341
10 changed files with 404 additions and 51 deletions

View File

@ -68,13 +68,15 @@ public void start() {
} }
/** /**
* Stop the snapshot from running. * Stop the reader from running. This method is called when the connector is stopped.
* <p>
* This method does nothing if the snapshot is not {@link #isRunning() running}.
*/ */
public void stop() { public void stop() {
if (this.running.compareAndSet(true, false)) { try {
doStop(); if (this.running.compareAndSet(true, false)) {
doStop();
}
} finally {
doShutdown();
} }
} }
@ -85,7 +87,10 @@ public void stop() {
/** /**
* The reader has been requested to stop, so perform any work required to stop the reader's resources that were previously * The reader has been requested to stop, so perform any work required to stop the reader's resources that were previously
* {@link #start() started}.. * {@link #start() started}.
* <p>
* This method is called only if the reader is not already stopped.
* @see #doShutdown()
*/ */
protected abstract void doStop(); protected abstract void doStop();
@ -95,6 +100,14 @@ public void stop() {
*/ */
protected abstract void doCleanup(); protected abstract void doCleanup();
/**
* The reader has been stopped.
* <p>
* This method is always called when the connector is stopped.
* @see #doStop()
*/
protected abstract void doShutdown();
/** /**
* Call this method only when the reader has successfully completed all of its work, signaling that subsequent * Call this method only when the reader has successfully completed all of its work, signaling that subsequent
* calls to {@link #poll()} should forever return {@code null}. * calls to {@link #poll()} should forever return {@code null}.

View File

@ -64,7 +64,8 @@ public class BinlogReader extends AbstractReader {
private final RecordMakers recordMakers; private final RecordMakers recordMakers;
private final SourceInfo source; private final SourceInfo source;
private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class); private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);
private BinaryLogClient client; private final BinaryLogClient client;
private final BinlogReaderMetrics metrics;
private int startingRowNumber = 0; private int startingRowNumber = 0;
private final Clock clock; private final Clock clock;
private final ElapsedTimeStrategy pollOutputDelay; private final ElapsedTimeStrategy pollOutputDelay;
@ -132,6 +133,10 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
new RowDeserializers.DeleteRowsDeserializer( new RowDeserializers.DeleteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true)); tableMapEventByTableId).setMayContainExtraInformation(true));
client.setEventDeserializer(eventDeserializer); client.setEventDeserializer(eventDeserializer);
// Set up for JMX ...
metrics = new BinlogReaderMetrics(client);
metrics.register(context, logger);
} }
@Override @Override
@ -160,7 +165,7 @@ protected void doStart() {
logger.info("GTID set from previous recorded offset: {}", gtidSetStr); logger.info("GTID set from previous recorded offset: {}", gtidSetStr);
// Remove any of the GTID sources that are not required/acceptable ... // Remove any of the GTID sources that are not required/acceptable ...
Predicate<String> gtidSourceFilter = context.gtidSourceFilter(); Predicate<String> gtidSourceFilter = context.gtidSourceFilter();
if ( gtidSourceFilter != null) { if (gtidSourceFilter != null) {
GtidSet gtidSet = new GtidSet(gtidSetStr).retainAll(gtidSourceFilter); GtidSet gtidSet = new GtidSet(gtidSetStr).retainAll(gtidSourceFilter);
gtidSetStr = gtidSet.toString(); gtidSetStr = gtidSet.toString();
logger.info("GTID set after applying GTID source includes/excludes: {}", gtidSetStr); logger.info("GTID set after applying GTID source includes/excludes: {}", gtidSetStr);
@ -219,6 +224,11 @@ protected void doStop() {
} }
} }
@Override
protected void doShutdown() {
metrics.unregister(logger);
}
@Override @Override
protected void doCleanup() { protected void doCleanup() {
} }

View File

@ -0,0 +1,80 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientStatistics;
/**
* @author Randall Hauch
*/
class BinlogReaderMetrics extends Metrics implements BinlogReaderMetricsMXBean {
private final BinaryLogClient client;
private final BinaryLogClientStatistics stats;
public BinlogReaderMetrics( BinaryLogClient client) {
super("binlog");
this.client = client;
this.stats = new BinaryLogClientStatistics(client);
}
@Override
public boolean isConnected() {
return this.client.isConnected();
}
@Override
public String getBinlogFilename() {
return this.client.getBinlogFilename();
}
@Override
public long getBinlogPosition() {
return this.client.getBinlogPosition();
}
@Override
public String getGtidSet() {
return this.client.getGtidSet();
}
@Override
public String getLastEvent() {
return this.stats.getLastEvent();
}
@Override
public long getSecondsSinceLastEvent() {
return this.stats.getSecondsSinceLastEvent();
}
@Override
public long getSecondsBehindMaster() {
return this.stats.getSecondsBehindMaster();
}
@Override
public long getTotalNumberOfEventsSeen() {
return this.stats.getTotalNumberOfEventsSeen();
}
@Override
public long getNumberOfSkippedEvents() {
return this.stats.getNumberOfSkippedEvents();
}
@Override
public long getNumberOfDisconnects() {
return this.stats.getNumberOfDisconnects();
}
@Override
public void reset() {
this.stats.reset();
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
/**
* @author Randall Hauch
*
*/
public interface BinlogReaderMetricsMXBean {
boolean isConnected();
String getBinlogFilename();
long getBinlogPosition();
String getGtidSet();
String getLastEvent();
long getSecondsSinceLastEvent();
long getSecondsBehindMaster();
long getTotalNumberOfEventsSeen();
long getNumberOfSkippedEvents();
long getNumberOfDisconnects();
void reset();
}

View File

@ -0,0 +1,51 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.lang.management.ManagementFactory;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.slf4j.Logger;
/**
* @author Randall Hauch
*
*/
abstract class Metrics {
private final String contextName;
private ObjectName name;
protected Metrics(String contextName) {
this.contextName = contextName;
}
public void register(MySqlTaskContext context, Logger logger) {
try {
this.name = context.metricName(this.contextName);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.registerMBean(this, name);
} catch (JMException e) {
logger.warn("Error while register the MBean '{}': {}", name, e.getMessage());
}
}
public void unregister(Logger logger) {
if (this.name != null) {
try {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.unregisterMBean(name);
} catch (JMException e) {
logger.error("Unable to unregister the MBean '{}'", name);
} finally {
this.name = null;
}
}
}
}

View File

@ -8,6 +8,9 @@
import java.util.Map; import java.util.Map;
import java.util.function.Predicate; import java.util.function.Predicate;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.function.Predicates; import io.debezium.function.Predicates;
@ -53,6 +56,10 @@ public MySqlTaskContext(Configuration config) {
: (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null); : (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null);
} }
public String connectorName() {
return config.getString("name");
}
public TopicSelector topicSelector() { public TopicSelector topicSelector() {
return topicSelector; return topicSelector;
} }
@ -223,5 +230,16 @@ public PreviousContext configureLoggingContext(String contextName) {
public void temporaryLoggingContext(String contextName, Runnable operation) { public void temporaryLoggingContext(String contextName, Runnable operation) {
LoggingContext.temporarilyForConnector("MySQL", serverName(), contextName, operation); LoggingContext.temporarilyForConnector("MySQL", serverName(), contextName, operation);
} }
/**
* Create a JMX metric name for the given metric.
* @param contextName the name of the context
* @return the JMX metric name
* @throws MalformedObjectNameException if the name is invalid
*/
public ObjectName metricName(String contextName) throws MalformedObjectNameException {
//return new ObjectName("debezium.mysql:type=connector-metrics,connector=" + serverName() + ",name=" + contextName);
return new ObjectName("debezium.mysql:type=connector-metrics,context=" + contextName + ",server=" + serverName());
}
} }

View File

@ -44,6 +44,7 @@ public class SnapshotReader extends AbstractReader {
private RecordRecorder recorder; private RecordRecorder recorder;
private volatile Thread thread; private volatile Thread thread;
private volatile Runnable onSuccessfulCompletion; private volatile Runnable onSuccessfulCompletion;
private final SnapshotReaderMetrics metrics;
/** /**
* Create a snapshot reader. * Create a snapshot reader.
@ -53,6 +54,8 @@ public class SnapshotReader extends AbstractReader {
public SnapshotReader(MySqlTaskContext context) { public SnapshotReader(MySqlTaskContext context) {
super(context); super(context);
recorder = this::recordRowAsRead; recorder = this::recordRowAsRead;
metrics = new SnapshotReaderMetrics(context.clock());
metrics.register(context, logger);
} }
/** /**
@ -115,14 +118,16 @@ protected void doStart() {
thread.start(); thread.start();
} }
/**
* Stop the snapshot from running.
*/
@Override @Override
protected void doStop() { protected void doStop() {
thread.interrupt(); thread.interrupt();
} }
@Override
protected void doShutdown() {
metrics.unregister(logger);
}
@Override @Override
protected void doCleanup() { protected void doCleanup() {
this.thread = null; this.thread = null;
@ -154,6 +159,8 @@ protected void execute() {
logRolesForCurrentUser(mysql); logRolesForCurrentUser(mysql);
logServerInformation(mysql); logServerInformation(mysql);
try { try {
metrics.startSnapshot();
// ------ // ------
// STEP 0 // STEP 0
// ------ // ------
@ -171,6 +178,7 @@ protected void execute() {
mysql.setAutoCommit(false); mysql.setAutoCommit(false);
sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"); sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
mysql.execute(sql.get()); mysql.execute(sql.get());
metrics.globalLockAcquired();
// Generate the DDL statements that set the charset-related system variables ... // Generate the DDL statements that set the charset-related system variables ...
Map<String, String> systemVariables = context.readMySqlCharsetSystemVariables(sql); Map<String, String> systemVariables = context.readMySqlCharsetSystemVariables(sql);
@ -317,6 +325,7 @@ protected void execute() {
mysql.execute(sql.get()); mysql.execute(sql.get());
unlocked = true; unlocked = true;
long lockReleased = clock.currentTimeInMillis(); long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
logger.info("Step 7: blocked writes to MySQL for a total of {}", Strings.duration(lockReleased - lockAcquired)); logger.info("Step 7: blocked writes to MySQL for a total of {}", Strings.duration(lockReleased - lockAcquired));
} }
@ -329,6 +338,8 @@ protected void execute() {
// Dump all of the tables and generate source records ... // Dump all of the tables and generate source records ...
logger.info("Step 8: scanning contents of {} tables", tableIds.size()); logger.info("Step 8: scanning contents of {} tables", tableIds.size());
metrics.setTableCount(tableIds.size());
long startScan = clock.currentTimeInMillis(); long startScan = clock.currentTimeInMillis();
AtomicBoolean interrupted = new AtomicBoolean(false); AtomicBoolean interrupted = new AtomicBoolean(false);
AtomicLong totalRowCount = new AtomicLong(); AtomicLong totalRowCount = new AtomicLong();
@ -388,6 +399,7 @@ protected void execute() {
} }
}); });
metrics.completeTable();
if (interrupted.get()) break; if (interrupted.get()) break;
} }
++completedCounter; ++completedCounter;
@ -420,6 +432,7 @@ protected void execute() {
mysql.execute(sql.get()); mysql.execute(sql.get());
unlocked = true; unlocked = true;
long lockReleased = clock.currentTimeInMillis(); long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased - lockAcquired)); logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
} }
@ -431,12 +444,14 @@ protected void execute() {
logger.info("Step {}: rolling back transaction after abort", step++); logger.info("Step {}: rolling back transaction after abort", step++);
sql.set("ROLLBACK"); sql.set("ROLLBACK");
mysql.execute(sql.get()); mysql.execute(sql.get());
metrics.abortSnapshot();
return; return;
} }
// Otherwise, commit our transaction // Otherwise, commit our transaction
logger.info("Step {}: committing transaction", step++); logger.info("Step {}: committing transaction", step++);
sql.set("COMMIT"); sql.set("COMMIT");
mysql.execute(sql.get()); mysql.execute(sql.get());
metrics.completeSnapshot();
try { try {
// Mark the source as having completed the snapshot. This will ensure the `source` field on records // Mark the source as having completed the snapshot. This will ensure the `source` field on records

View File

@ -0,0 +1,114 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import io.debezium.util.Clock;
/**
* @author Randall Hauch
*
*/
class SnapshotReaderMetrics extends Metrics implements SnapshotReaderMetricsMXBean {
private final AtomicLong tableCount = new AtomicLong();
private final AtomicLong remainingTableCount = new AtomicLong();
private final AtomicBoolean holdingGlobalLock = new AtomicBoolean();
private final AtomicBoolean snapshotRunning = new AtomicBoolean();
private final AtomicBoolean snapshotCompleted = new AtomicBoolean();
private final AtomicBoolean snapshotAborted = new AtomicBoolean();
private final AtomicLong startTime = new AtomicLong();
private final AtomicLong stopTime = new AtomicLong();
private final Clock clock;
public SnapshotReaderMetrics(Clock clock) {
super("snapshot");
this.clock= clock;
}
@Override
public int getTotalTableCount() {
return this.tableCount.intValue();
}
@Override
public int getRemainingTableCount() {
return this.remainingTableCount.intValue();
}
@Override
public boolean getSnapshotRunning() {
return this.snapshotRunning.get();
}
@Override
public boolean getSnapshotCompleted() {
return this.snapshotCompleted.get();
}
@Override
public boolean getSnapshotAborted() {
return this.snapshotAborted.get();
}
@Override
public boolean getHoldingGlobalLock() {
return holdingGlobalLock.get();
}
@Override
public long getSnapshotDurationInSeconds() {
long startMillis = startTime.get();
if ( startMillis <= 0L) {
return 0;
}
long stopMillis = stopTime.get();
if ( stopMillis == 0L ) stopMillis = clock.currentTimeInMillis();
return (stopMillis - startMillis)/1000L;
}
public void globalLockAcquired() {
holdingGlobalLock.set(true);
}
public void globalLockReleased() {
holdingGlobalLock.set(false);
}
public void setTableCount(int tableCount) {
this.tableCount.set(tableCount);
this.remainingTableCount.set(tableCount);
}
public void completeTable() {
remainingTableCount.decrementAndGet();
}
public void startSnapshot() {
this.snapshotRunning.set(true);
this.snapshotCompleted.set(false);
this.snapshotAborted.set(false);
this.startTime.set(clock.currentTimeInMillis());
this.stopTime.set(0L);
}
public void completeSnapshot() {
this.snapshotCompleted.set(true);
this.snapshotAborted.set(false);
this.snapshotRunning.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
public void abortSnapshot() {
this.snapshotCompleted.set(false);
this.snapshotAborted.set(true);
this.snapshotRunning.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
/**
* @author Randall Hauch
*/
public interface SnapshotReaderMetricsMXBean {
int getTotalTableCount();
int getRemainingTableCount();
boolean getHoldingGlobalLock();
boolean getSnapshotRunning();
boolean getSnapshotAborted();
boolean getSnapshotCompleted();
long getSnapshotDurationInSeconds();
}

View File

@ -37,7 +37,7 @@ public class SnapshotReaderIT {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-snapshot.txt").toAbsolutePath(); private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-snapshot.txt").toAbsolutePath();
private static final String DB_NAME = "connector_test_ro"; private static final String DB_NAME = "connector_test_ro";
private static final String LOGICAL_NAME = "logical_server_name"; private static final String LOGICAL_NAME = "logical_server_name";
private Configuration config; private Configuration config;
private MySqlTaskContext context; private MySqlTaskContext context;
private SnapshotReader reader; private SnapshotReader reader;
@ -51,6 +51,13 @@ public void beforeEach() {
@After @After
public void afterEach() { public void afterEach() {
if (reader != null) {
try {
reader.stop();
} finally {
reader = null;
}
}
if (context != null) { if (context != null) {
try { try {
context.shutdown(); context.shutdown();
@ -90,17 +97,17 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
reader.onSuccessfulCompletion(completed::countDown); reader.onSuccessfulCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateInsertEvents();
reader.useMinimalBlocking(true); reader.useMinimalBlocking(true);
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
// Poll for records ... // Poll for records ...
//Testing.Print.enable(); // Testing.Print.enable();
List<SourceRecord> records = null; List<SourceRecord> records = null;
KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(LOGICAL_NAME + "."); KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(LOGICAL_NAME + ".");
SchemaChangeHistory schemaChanges = new SchemaChangeHistory(LOGICAL_NAME); SchemaChangeHistory schemaChanges = new SchemaChangeHistory(LOGICAL_NAME);
while ( (records = reader.poll()) != null ) { while ((records = reader.poll()) != null) {
records.forEach(record->{ records.forEach(record -> {
VerifyRecord.isValid(record); VerifyRecord.isValid(record);
store.add(record); store.add(record);
schemaChanges.add(record); schemaChanges.add(record);
@ -108,13 +115,13 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
} }
// The last poll should always return null ... // The last poll should always return null ...
assertThat(records).isNull(); assertThat(records).isNull();
// There should be no schema changes ... // There should be no schema changes ...
assertThat(schemaChanges.recordCount()).isEqualTo(0); assertThat(schemaChanges.recordCount()).isEqualTo(0);
// Check the records via the store ... // Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(4); assertThat(store.collectionCount()).isEqualTo(4);
Collection products = store.collection(DB_NAME,"products"); Collection products = store.collection(DB_NAME, "products");
assertThat(products.numberOfCreates()).isEqualTo(9); assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0); assertThat(products.numberOfUpdates()).isEqualTo(0);
assertThat(products.numberOfDeletes()).isEqualTo(0); assertThat(products.numberOfDeletes()).isEqualTo(0);
@ -123,7 +130,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1);
Collection products_on_hand = store.collection(DB_NAME,"products_on_hand"); Collection products_on_hand = store.collection(DB_NAME, "products_on_hand");
assertThat(products_on_hand.numberOfCreates()).isEqualTo(9); assertThat(products_on_hand.numberOfCreates()).isEqualTo(9);
assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0); assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0);
assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0); assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0);
@ -132,7 +139,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1);
Collection customers = store.collection(DB_NAME,"customers"); Collection customers = store.collection(DB_NAME, "customers");
assertThat(customers.numberOfCreates()).isEqualTo(4); assertThat(customers.numberOfCreates()).isEqualTo(4);
assertThat(customers.numberOfUpdates()).isEqualTo(0); assertThat(customers.numberOfUpdates()).isEqualTo(0);
assertThat(customers.numberOfDeletes()).isEqualTo(0); assertThat(customers.numberOfDeletes()).isEqualTo(0);
@ -141,7 +148,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1);
Collection orders = store.collection(DB_NAME,"orders"); Collection orders = store.collection(DB_NAME, "orders");
assertThat(orders.numberOfCreates()).isEqualTo(5); assertThat(orders.numberOfCreates()).isEqualTo(5);
assertThat(orders.numberOfUpdates()).isEqualTo(0); assertThat(orders.numberOfUpdates()).isEqualTo(0);
assertThat(orders.numberOfDeletes()).isEqualTo(0); assertThat(orders.numberOfDeletes()).isEqualTo(0);
@ -151,7 +158,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
// Make sure the snapshot completed ... // Make sure the snapshot completed ...
if ( completed.await(10, TimeUnit.SECONDS) ) { if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ... // completed the snapshot ...
Testing.print("completed the snapshot"); Testing.print("completed the snapshot");
} else { } else {
@ -159,7 +166,6 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
} }
} }
@Test @Test
public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Exception { public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_(.*)").build(); config = simpleConfig().with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_(.*)").build();
@ -169,17 +175,17 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
reader.onSuccessfulCompletion(completed::countDown); reader.onSuccessfulCompletion(completed::countDown);
reader.generateReadEvents(); reader.generateReadEvents();
reader.useMinimalBlocking(true); reader.useMinimalBlocking(true);
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
// Poll for records ... // Poll for records ...
//Testing.Print.enable(); // Testing.Print.enable();
List<SourceRecord> records = null; List<SourceRecord> records = null;
KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(LOGICAL_NAME + "."); KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(LOGICAL_NAME + ".");
SchemaChangeHistory schemaChanges = new SchemaChangeHistory(LOGICAL_NAME); SchemaChangeHistory schemaChanges = new SchemaChangeHistory(LOGICAL_NAME);
while ( (records = reader.poll()) != null ) { while ((records = reader.poll()) != null) {
records.forEach(record->{ records.forEach(record -> {
VerifyRecord.isValid(record); VerifyRecord.isValid(record);
store.add(record); store.add(record);
schemaChanges.add(record); schemaChanges.add(record);
@ -187,15 +193,15 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
} }
// The last poll should always return null ... // The last poll should always return null ...
assertThat(records).isNull(); assertThat(records).isNull();
// There should be no schema changes ... // There should be no schema changes ...
assertThat(schemaChanges.recordCount()).isEqualTo(0); assertThat(schemaChanges.recordCount()).isEqualTo(0);
// Check the records via the store ... // Check the records via the store ...
assertThat(store.databases()).containsOnly(DB_NAME,"connector_test"); // 2 databases assertThat(store.databases()).containsOnly(DB_NAME, "connector_test"); // 2 databases
assertThat(store.collectionCount()).isEqualTo(8); // 2 databases assertThat(store.collectionCount()).isEqualTo(8); // 2 databases
Collection products = store.collection(DB_NAME,"products"); Collection products = store.collection(DB_NAME, "products");
assertThat(products.numberOfCreates()).isEqualTo(0); assertThat(products.numberOfCreates()).isEqualTo(0);
assertThat(products.numberOfUpdates()).isEqualTo(0); assertThat(products.numberOfUpdates()).isEqualTo(0);
assertThat(products.numberOfDeletes()).isEqualTo(0); assertThat(products.numberOfDeletes()).isEqualTo(0);
@ -204,7 +210,7 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1);
Collection products_on_hand = store.collection(DB_NAME,"products_on_hand"); Collection products_on_hand = store.collection(DB_NAME, "products_on_hand");
assertThat(products_on_hand.numberOfCreates()).isEqualTo(0); assertThat(products_on_hand.numberOfCreates()).isEqualTo(0);
assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0); assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0);
assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0); assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0);
@ -213,7 +219,7 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1);
Collection customers = store.collection(DB_NAME,"customers"); Collection customers = store.collection(DB_NAME, "customers");
assertThat(customers.numberOfCreates()).isEqualTo(0); assertThat(customers.numberOfCreates()).isEqualTo(0);
assertThat(customers.numberOfUpdates()).isEqualTo(0); assertThat(customers.numberOfUpdates()).isEqualTo(0);
assertThat(customers.numberOfDeletes()).isEqualTo(0); assertThat(customers.numberOfDeletes()).isEqualTo(0);
@ -222,7 +228,7 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1);
Collection orders = store.collection(DB_NAME,"orders"); Collection orders = store.collection(DB_NAME, "orders");
assertThat(orders.numberOfCreates()).isEqualTo(0); assertThat(orders.numberOfCreates()).isEqualTo(0);
assertThat(orders.numberOfUpdates()).isEqualTo(0); assertThat(orders.numberOfUpdates()).isEqualTo(0);
assertThat(orders.numberOfDeletes()).isEqualTo(0); assertThat(orders.numberOfDeletes()).isEqualTo(0);
@ -232,7 +238,7 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
// Make sure the snapshot completed ... // Make sure the snapshot completed ...
if ( completed.await(10, TimeUnit.SECONDS) ) { if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ... // completed the snapshot ...
Testing.print("completed the snapshot"); Testing.print("completed the snapshot");
} else { } else {
@ -249,17 +255,17 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
reader.onSuccessfulCompletion(completed::countDown); reader.onSuccessfulCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateInsertEvents();
reader.useMinimalBlocking(true); reader.useMinimalBlocking(true);
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
// Poll for records ... // Poll for records ...
//Testing.Print.enable(); // Testing.Print.enable();
List<SourceRecord> records = null; List<SourceRecord> records = null;
KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(LOGICAL_NAME + "."); KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(LOGICAL_NAME + ".");
SchemaChangeHistory schemaChanges = new SchemaChangeHistory(LOGICAL_NAME); SchemaChangeHistory schemaChanges = new SchemaChangeHistory(LOGICAL_NAME);
while ( (records = reader.poll()) != null ) { while ((records = reader.poll()) != null) {
records.forEach(record->{ records.forEach(record -> {
VerifyRecord.isValid(record); VerifyRecord.isValid(record);
store.add(record); store.add(record);
schemaChanges.add(record); schemaChanges.add(record);
@ -267,15 +273,15 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
} }
// The last poll should always return null ... // The last poll should always return null ...
assertThat(records).isNull(); assertThat(records).isNull();
// There should be 11 schema changes plus 1 SET statement ... // There should be 11 schema changes plus 1 SET statement ...
assertThat(schemaChanges.recordCount()).isEqualTo(12); assertThat(schemaChanges.recordCount()).isEqualTo(12);
assertThat(schemaChanges.databaseCount()).isEqualTo(2); assertThat(schemaChanges.databaseCount()).isEqualTo(2);
assertThat(schemaChanges.databases()).containsOnly(DB_NAME,""); assertThat(schemaChanges.databases()).containsOnly(DB_NAME, "");
// Check the records via the store ... // Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(4); assertThat(store.collectionCount()).isEqualTo(4);
Collection products = store.collection(DB_NAME,"products"); Collection products = store.collection(DB_NAME, "products");
assertThat(products.numberOfCreates()).isEqualTo(9); assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0); assertThat(products.numberOfUpdates()).isEqualTo(0);
assertThat(products.numberOfDeletes()).isEqualTo(0); assertThat(products.numberOfDeletes()).isEqualTo(0);
@ -284,7 +290,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1);
Collection products_on_hand = store.collection(DB_NAME,"products_on_hand"); Collection products_on_hand = store.collection(DB_NAME, "products_on_hand");
assertThat(products_on_hand.numberOfCreates()).isEqualTo(9); assertThat(products_on_hand.numberOfCreates()).isEqualTo(9);
assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0); assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0);
assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0); assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0);
@ -293,7 +299,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1);
Collection customers = store.collection(DB_NAME,"customers"); Collection customers = store.collection(DB_NAME, "customers");
assertThat(customers.numberOfCreates()).isEqualTo(4); assertThat(customers.numberOfCreates()).isEqualTo(4);
assertThat(customers.numberOfUpdates()).isEqualTo(0); assertThat(customers.numberOfUpdates()).isEqualTo(0);
assertThat(customers.numberOfDeletes()).isEqualTo(0); assertThat(customers.numberOfDeletes()).isEqualTo(0);
@ -302,7 +308,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1);
Collection orders = store.collection(DB_NAME,"orders"); Collection orders = store.collection(DB_NAME, "orders");
assertThat(orders.numberOfCreates()).isEqualTo(5); assertThat(orders.numberOfCreates()).isEqualTo(5);
assertThat(orders.numberOfUpdates()).isEqualTo(0); assertThat(orders.numberOfUpdates()).isEqualTo(0);
assertThat(orders.numberOfDeletes()).isEqualTo(0); assertThat(orders.numberOfDeletes()).isEqualTo(0);
@ -312,7 +318,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
// Make sure the snapshot completed ... // Make sure the snapshot completed ...
if ( completed.await(10, TimeUnit.SECONDS) ) { if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ... // completed the snapshot ...
Testing.print("completed the snapshot"); Testing.print("completed the snapshot");
} else { } else {