DBZ-26 Corrected the embedded connector framework to enable stopping. Also improved logging statements.

This commit is contained in:
Randall Hauch 2016-03-03 15:27:11 -06:00
parent 26860121b7
commit 9034e26d1e
6 changed files with 74 additions and 9 deletions

View File

@ -17,6 +17,7 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -67,6 +68,8 @@ public final class MySqlConnectorTask extends SourceTask {
"slave_relay_log_info", "slave_master_info",
"slave_worker_info", "gtid_executed",
"server_cost", "engine_cost");
private final Set<String> BUILT_IN_DB_NAMES = Collect.unmodifiableSet("mysql", "performance_schema");
private final Logger logger = LoggerFactory.getLogger(getClass());
private final TopicSelector topicSelector;
@ -81,6 +84,7 @@ public final class MySqlConnectorTask extends SourceTask {
private int maxBatchSize;
private String serverName;
private Metronome metronome;
private final AtomicBoolean running = new AtomicBoolean(false);
// Used in the methods that process events ...
private final SourceInfo source = new SourceInfo();
@ -122,6 +126,7 @@ public void start(Map<String, String> props) {
// prefix
this.dbHistory.configure(dbHistoryConfig); // validates
this.dbHistory.start();
this.running.set(true);
// Read the configuration ...
final String user = config.getString(MySqlConnectorConfig.USER);
@ -145,7 +150,9 @@ public void start(Map<String, String> props) {
config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
Predicate<TableId> ignoreBuiltins = (id) -> !BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());
Predicate<TableId> ignoreBuiltins = (id) -> {
return !BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase()) && !BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase());
};
tableFilter = ignoreBuiltins.or(tableFilter);
}
@ -197,7 +204,7 @@ public void start(Map<String, String> props) {
logger.info("Recovering MySQL connector '{}' database schemas from history stored in {}", serverName, dbHistory);
DdlParser ddlParser = new MySqlDdlParser();
dbHistory.recover(source.partition(), source.offset(), tables, ddlParser);
logger.debug("Recovered MySQL connector '{}' database schemas: {}", serverName, tables);
logger.debug("Recovered MySQL connector '{}' database schemas: {}", serverName, tables.subset(tableFilter));
} catch (Throwable t) {
throw new ConnectException("Failure while recovering database schemas", t);
}
@ -229,7 +236,7 @@ public void start(Map<String, String> props) {
@Override
public List<SourceRecord> poll() throws InterruptedException {
logger.trace("Polling for events from MySQL server '{}'", serverName);
while (events.drainTo(batchEvents, maxBatchSize - batchEvents.size()) == 0 || batchEvents.isEmpty()) {
while (running.get() && (events.drainTo(batchEvents, maxBatchSize - batchEvents.size()) == 0 || batchEvents.isEmpty())) {
// No events to process, so sleep for a bit ...
metronome.pause();
}
@ -263,6 +270,8 @@ public List<SourceRecord> poll() throws InterruptedException {
source.setRowInEvent(0);
}
}
if ( !running.get()) break;
// If there is a handler for this event, forward the event to it ...
EventHandler handler = eventHandlers.get(eventType);
@ -272,6 +281,12 @@ public List<SourceRecord> poll() throws InterruptedException {
}
logger.trace("Completed processing {} events from MySQL server '{}'", serverName);
if (!this.running.get()) {
// We're supposed to stop, so return nothing that we might have already processed
// so that no records get persisted if DB history has already been stopped ...
return null;
}
// We've processed them all, so clear the batch and return the records ...
assert batchEvents.isEmpty();
return records;
@ -280,6 +295,10 @@ public List<SourceRecord> poll() throws InterruptedException {
@Override
public void stop() {
try {
// Signal to the 'poll()' method that it should stop what its doing ...
this.running.set(false);
// Flush and stop the database history ...
logger.debug("Stopping database history for MySQL server '{}'", serverName);
dbHistory.stop();
} catch (Throwable e) {

View File

@ -95,7 +95,32 @@ public void shouldStartAndPollShouldReturnSourceRecordsFromDatabase() throws SQL
}
Testing.Print.enable();
assertThat(consumeAvailableRecords(this::print)).isGreaterThan(0); // expecting at least 1
int totalConsumed = consumeAvailableRecords(this::print); // expecting at least 1
stopConnector();
// Restart the connector and wait for a few seconds (at most) for records that will never arrive ...
start(MySqlConnector.class, config);
waitForAvailableRecords(2, TimeUnit.SECONDS);
totalConsumed += consumeAvailableRecords(this::print);
stopConnector();
// Create an additional few records ...
Testing.Print.disable();
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("INSERT INTO products VALUES (default,'roy','old robot',1234.56);");
connection.query("SELECT * FROM products", rs->{if (Testing.Print.isEnabled()) connection.print(rs);});
}
}
// Restart the connector and wait for a few seconds (at most) for the new record ...
Testing.Print.enable();
start(MySqlConnector.class, config);
waitForAvailableRecords(5, TimeUnit.SECONDS);
totalConsumed += consumeAvailableRecords(this::print);
stopConnector();
// We should have seen a total of 30 events, though when they appear may vary ...
assertThat(totalConsumed).isEqualTo(30);
}
}

View File

@ -12,6 +12,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema;
@ -314,6 +315,19 @@ public boolean equals(Object obj) {
return false;
}
public Tables subset(Predicate<TableId> filter) {
if (filter == null) return this;
return lock.read(() -> {
Tables result = new Tables();
tablesByTableId.forEach((tableId, table) -> {
if (filter.test(tableId)) {
result.overwriteTable(table);
}
});
return result;
});
}
@Override
public String toString() {
return lock.read(() -> {

View File

@ -47,7 +47,7 @@ public static void enable() {
}
public static void disable() {
enabled = true;
enabled = false;
}
public static boolean isEnabled() {

View File

@ -469,7 +469,7 @@ protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy p
long started = clock.currentTimeInMillis();
long timeout = started + commitTimeoutMs;
offsetWriter.beginFlush();
if ( !offsetWriter.beginFlush() ) return;
Future<Void> flush = offsetWriter.doFlush(this::completedFlush);
if (flush == null) return; // no offsets to commit ...

View File

@ -83,9 +83,7 @@ public void stopConnector(BooleanConsumer callback) {
if (engine != null && engine.isRunning()) {
engine.stop();
try {
while (!engine.await(5, TimeUnit.SECONDS)) {
// Wait for connector to stop completely ...
}
engine.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.interrupted();
}
@ -101,6 +99,15 @@ public void stopConnector(BooleanConsumer callback) {
Thread.interrupted();
}
}
if (engine != null && engine.isRunning()) {
try {
while (!engine.await(5, TimeUnit.SECONDS)) {
// Wait for connector to stop completely ...
}
} catch (InterruptedException e) {
Thread.interrupted();
}
}
if (callback != null) callback.accept(engine != null ? engine.isRunning() : false);
} finally {
engine = null;