Merge pull request #53 from rhauch/dbz-58
DBZ-58 Added MDC logging contexts to connector
This commit is contained in:
commit
a25d380214
@ -18,7 +18,6 @@
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
|
||||
import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
||||
import com.github.shyiko.mysql.binlog.BinaryLogClient.AbstractLifecycleListener;
|
||||
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
|
||||
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
|
||||
import com.github.shyiko.mysql.binlog.event.Event;
|
||||
@ -68,17 +67,7 @@ public BinlogReader(MySqlTaskContext context) {
|
||||
client.setServerId(context.serverId());
|
||||
client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
|
||||
client.registerEventListener(this::handleEvent);
|
||||
client.registerLifecycleListener(new AbstractLifecycleListener(){
|
||||
@Override
|
||||
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
|
||||
failed(ex,"Stopped reading binlog due to error: " + ex.getMessage());
|
||||
}
|
||||
@Override
|
||||
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
|
||||
failed(ex,"Stopped reading binlog due to error: " + ex.getMessage());
|
||||
}
|
||||
});
|
||||
client.registerLifecycleListener(new TraceLifecycleListener());
|
||||
client.registerLifecycleListener(new ReaderThreadLifecycleListener());
|
||||
if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);
|
||||
|
||||
// Set up the event deserializer with additional type(s) ...
|
||||
@ -104,24 +93,22 @@ protected void doStart() {
|
||||
client.setBinlogFilename(source.binlogFilename());
|
||||
client.setBinlogPosition(source.binlogPosition());
|
||||
// The event row number will be used when processing the first event ...
|
||||
logger.info("Reading from MySQL {} starting at {}",context.serverName(), source);
|
||||
|
||||
// Start the log reader, which starts background threads ...
|
||||
long timeoutInMilliseconds = context.timeoutInMilliseconds();
|
||||
try {
|
||||
logger.debug("Binlog reader connecting to MySQL server '{}'", context.serverName());
|
||||
logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeoutInMilliseconds);
|
||||
client.connect(context.timeoutInMilliseconds());
|
||||
logger.info("Successfully started reading MySQL binlog");
|
||||
} catch (TimeoutException e) {
|
||||
double seconds = TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds);
|
||||
throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to the MySQL database at " +
|
||||
context.username() + ":" + context.port() + " with user '" + context.username() + "'", e);
|
||||
throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to MySQL at " +
|
||||
context.hostname() + ":" + context.port() + " with user '" + context.username() + "'", e);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new ConnectException("Failed to authenticate to the MySQL database at " + context.hostname() + ":" +
|
||||
context.port() + " with user '" + context.username() + "'", e);
|
||||
throw new ConnectException("Failed to authenticate to the MySQL database at " +
|
||||
context.hostname() + ":" + context.port() + " with user '" + context.username() + "'", e);
|
||||
} catch (Throwable e) {
|
||||
throw new ConnectException("Unable to connect to the MySQL database at " + context.hostname() + ":" + context.port() +
|
||||
" with user '" + context.username() + "': " + e.getMessage(), e);
|
||||
throw new ConnectException("Unable to connect to the MySQL database at " +
|
||||
context.hostname() + ":" + context.port() + " with user '" + context.username() + "': " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
}
|
||||
@ -129,9 +116,8 @@ protected void doStart() {
|
||||
@Override
|
||||
protected void doStop() {
|
||||
try {
|
||||
logger.debug("Binlog reader disconnecting from MySQL server '{}'", context.serverName());
|
||||
logger.debug("Stopping binlog reader");
|
||||
client.disconnect();
|
||||
logger.info("Stopped connector to MySQL server '{}'", context.serverName());
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected error when disconnecting from the MySQL binary log reader", e);
|
||||
}
|
||||
@ -142,7 +128,7 @@ protected void doCleanup() {
|
||||
}
|
||||
|
||||
protected void logEvent(Event event) {
|
||||
//logger.debug("Received event: {}", event);
|
||||
logger.trace("Received event: {}", event);
|
||||
}
|
||||
|
||||
protected void ignoreEvent(Event event) {
|
||||
@ -344,25 +330,31 @@ protected void handleDelete(Event event) throws InterruptedException {
|
||||
}
|
||||
}
|
||||
|
||||
protected final class TraceLifecycleListener implements LifecycleListener {
|
||||
protected final class ReaderThreadLifecycleListener implements LifecycleListener {
|
||||
@Override
|
||||
public void onDisconnect(BinaryLogClient client) {
|
||||
logger.debug("MySQL Connector disconnected");
|
||||
context.temporaryLoggingContext("binlog", () -> {
|
||||
logger.info("Stopped reading binlog and closed connection");
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnect(BinaryLogClient client) {
|
||||
logger.info("MySQL Connector connected");
|
||||
// Set up the MDC logging context for this thread ...
|
||||
context.configureLoggingContext("binlog");
|
||||
|
||||
// The event row number will be used when processing the first event ...
|
||||
logger.info("Connected to MySQL binlog at {}:{}, starting at {}", context.hostname(), context.port(), source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
|
||||
logger.error("MySQL Connector communication failure", ex);
|
||||
BinlogReader.this.failed(ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
|
||||
logger.error("MySQL Connector received event deserialization failure", ex);
|
||||
BinlogReader.this.failed(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ public void start(Map<String, String> props) {
|
||||
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
|
||||
}
|
||||
|
||||
// Create the task and set our running flag ...
|
||||
// Create and start the task context ...
|
||||
this.taskContext = new MySqlTaskContext(config);
|
||||
this.taskContext.start();
|
||||
|
||||
@ -133,13 +133,13 @@ public void start(Map<String, String> props) {
|
||||
|
||||
@Override
|
||||
public List<SourceRecord> poll() throws InterruptedException {
|
||||
logger.trace("Polling for events from MySQL connector");
|
||||
logger.trace("Polling for events");
|
||||
return currentReader.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
logger.info("Stopping MySQL Connector");
|
||||
logger.info("Stopping MySQL connector task");
|
||||
// We need to explicitly stop both readers, in this order. If we were to instead call 'currentReader.stop()', there
|
||||
// is a chance without synchronization that we'd miss the transition and stop only the snapshot reader. And stopping both
|
||||
// is far simpler and more efficient than synchronizing ...
|
||||
@ -155,7 +155,7 @@ public void stop() {
|
||||
} catch (Throwable e) {
|
||||
logger.error("Unexpected error shutting down the database history and/or closing JDBC connections", e);
|
||||
} finally {
|
||||
logger.info("Stopped connector to MySQL server '{}'", taskContext.serverName());
|
||||
logger.info("Connector task successfully stopped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,8 @@
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.jdbc.JdbcConnection.ConnectionFactory;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.LoggingContext;
|
||||
import io.debezium.util.LoggingContext.PreviousContext;
|
||||
|
||||
/**
|
||||
* A Kafka Connect source task reads the MySQL binary log and generate the corresponding data change events.
|
||||
@ -96,7 +98,7 @@ public void loadHistory(SourceInfo startingPoint) {
|
||||
dbSchema.loadHistory(startingPoint);
|
||||
recordProcessor.regenerate();
|
||||
}
|
||||
|
||||
|
||||
public Clock clock() {
|
||||
return clock;
|
||||
}
|
||||
@ -157,21 +159,23 @@ protected SnapshotMode snapshotMode() {
|
||||
String value = config.getString(MySqlConnectorConfig.SNAPSHOT_MODE);
|
||||
return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValue());
|
||||
}
|
||||
|
||||
|
||||
public boolean useMinimalSnapshotLocking() {
|
||||
return config.getBoolean(MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
// Start the MySQL database history, which simply starts up resources but does not recover the history to a specific
|
||||
// point.
|
||||
// First, configure the logging context for the thread that created this context object ...
|
||||
this.configureLoggingContext("task");
|
||||
|
||||
// Start the MySQL database history, which simply starts up resources but does not recover the history to a specific point
|
||||
dbSchema().start();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
try {
|
||||
// Flush and stop the database history ...
|
||||
logger.debug("Stopping database history for MySQL server '{}'", serverName());
|
||||
logger.debug("Stopping database history");
|
||||
dbSchema.shutdown();
|
||||
} catch (Throwable e) {
|
||||
logger.error("Unexpected error shutting down the database history", e);
|
||||
@ -184,4 +188,26 @@ public void shutdown() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the logger's Mapped Diagnostic Context (MDC) properties for the thread making this call.
|
||||
* @param contextName the name of the context; may not be null
|
||||
* @return the previous MDC context; never null
|
||||
* @throws IllegalArgumentException if {@code contextName} is null
|
||||
*/
|
||||
public PreviousContext configureLoggingContext(String contextName) {
|
||||
return LoggingContext.forConnector("MySQL", serverName(), contextName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the supplied function in the temporary connector MDC context, and when complete always return the MDC context to its
|
||||
* state before this method was called.
|
||||
*
|
||||
* @param contextName the name of the context; may not be null
|
||||
* @param operation the function to run in the new MDC context; may not be null
|
||||
* @throws IllegalArgumentException if any of the parameters are null
|
||||
*/
|
||||
public void temporaryLoggingContext(String contextName, Runnable operation) {
|
||||
LoggingContext.temporarilyForConnector("MySQL", serverName(), contextName, operation);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,8 @@
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
* A component that performs a snapshot of a MySQL server, and records the schema changes in {@link MySqlSchema}.
|
||||
@ -127,13 +129,15 @@ protected void doCleanup() {
|
||||
* Perform the snapshot using the same logic as the "mysqldump" utility.
|
||||
*/
|
||||
protected void execute() {
|
||||
logger.info("Starting snapshot for MySQL server {}", context.serverName());
|
||||
context.configureLoggingContext("snapshot");
|
||||
logger.info("Starting snapshot");
|
||||
final AtomicReference<String> sql = new AtomicReference<>();
|
||||
final JdbcConnection mysql = context.jdbc();
|
||||
final MySqlSchema schema = context.dbSchema();
|
||||
final Filters filters = schema.filters();
|
||||
final SourceInfo source = context.source();
|
||||
final long ts = context.clock().currentTimeInMillis();
|
||||
final Clock clock = context.clock();
|
||||
final long ts = clock.currentTimeInMillis();
|
||||
try {
|
||||
// ------
|
||||
// STEP 0
|
||||
@ -148,6 +152,7 @@ protected void execute() {
|
||||
// See: https://dev.mysql.com/doc/refman/5.7/en/set-transaction.html
|
||||
// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-transaction-isolation-levels.html
|
||||
// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-consistent-read.html
|
||||
logger.info("Step 0: disabling autocommit and enabling repeatable read transactions");
|
||||
mysql.setAutoCommit(false);
|
||||
sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
|
||||
mysql.execute(sql.get());
|
||||
@ -157,6 +162,7 @@ protected void execute() {
|
||||
// ------
|
||||
// First, start a transaction and request that a consistent MVCC snapshot is obtained immediately.
|
||||
// See http://dev.mysql.com/doc/refman/5.7/en/commit.html
|
||||
logger.info("Step 1: start transaction with consistent snapshot");
|
||||
sql.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
|
||||
mysql.execute(sql.get());
|
||||
|
||||
@ -166,6 +172,8 @@ protected void execute() {
|
||||
// Obtain read lock on all tables. This statement closes all open tables and locks all tables
|
||||
// for all databases with a global read lock, and it prevents ALL updates while we have this lock.
|
||||
// It also ensures that everything we do while we have this lock will be consistent.
|
||||
long lockAcquired = clock.currentTimeInMillis();
|
||||
logger.info("Step 2: flush and obtain global read lock (preventing writes to database)");
|
||||
sql.set("FLUSH TABLES WITH READ LOCK");
|
||||
mysql.execute(sql.get());
|
||||
|
||||
@ -174,6 +182,7 @@ protected void execute() {
|
||||
// ------
|
||||
// Obtain the binlog position and update the SourceInfo in the context. This means that all source records generated
|
||||
// as part of the snapshot will contain the binlog position of the snapshot.
|
||||
logger.info("Step 3: read binlog position of MySQL master");
|
||||
sql.set("SHOW MASTER STATUS");
|
||||
mysql.query(sql.get(), rs -> {
|
||||
if (rs.next()) {
|
||||
@ -191,6 +200,7 @@ protected void execute() {
|
||||
// STEP 4
|
||||
// ------
|
||||
// Get the list of databases ...
|
||||
logger.info("Step 4: read list of available databases");
|
||||
final List<String> databaseNames = new ArrayList<>();
|
||||
sql.set("SHOW DATABASES");
|
||||
mysql.query(sql.get(), rs -> {
|
||||
@ -205,6 +215,7 @@ protected void execute() {
|
||||
// Get the list of table IDs for each database. We can't use a prepared statement with MySQL, so we have to
|
||||
// build the SQL statement each time. Although in other cases this might lead to SQL injection, in our case
|
||||
// we are reading the database names from the database and not taking them from the user ...
|
||||
logger.info("Step 5: read list of available tables in each database");
|
||||
final List<TableId> tableIds = new ArrayList<>();
|
||||
final Map<String,List<TableId>> tableIdsByDbName = new HashMap<>();
|
||||
for (String dbName : databaseNames) {
|
||||
@ -225,6 +236,7 @@ protected void execute() {
|
||||
// ------
|
||||
// Transform the current schema so that it reflects the *current* state of the MySQL server's contents.
|
||||
// First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ...
|
||||
logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas");
|
||||
final List<String> ddlStatements = new ArrayList<>();
|
||||
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
|
||||
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
|
||||
@ -253,6 +265,7 @@ protected void execute() {
|
||||
}
|
||||
}
|
||||
// Finally, apply the DDL statements to the schema and then update the record maker...
|
||||
logger.debug("Step 6b: applying DROP and CREATE statements to connector's table model");
|
||||
String ddlStatementsStr = String.join(";" + System.lineSeparator(), ddlStatements);
|
||||
schema.applyDdl(source, null, ddlStatementsStr, this::enqueueSchemaChanges);
|
||||
context.makeRecord().regenerate();
|
||||
@ -266,17 +279,25 @@ protected void execute() {
|
||||
// should still use the MVCC snapshot obtained when we started our transaction (since we started it
|
||||
// "...with consistent snapshot"). So, since we're only doing very simple SELECT without WHERE predicates,
|
||||
// we can release the lock now ...
|
||||
logger.info("Step 7: releasing global read lock to enable MySQL writes");
|
||||
sql.set("UNLOCK TABLES");
|
||||
mysql.execute(sql.get());
|
||||
unlocked = true;
|
||||
long lockReleased = clock.currentTimeInMillis();
|
||||
logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased-lockAcquired));
|
||||
}
|
||||
|
||||
// ------
|
||||
// STEP 8
|
||||
// ------
|
||||
// Dump all of the tables and generate source records ...
|
||||
logger.info("Step 8: scanning contents of {} tables",tableIds.size());
|
||||
long startScan = clock.currentTimeInMillis();
|
||||
AtomicBoolean interrupted = new AtomicBoolean(false);
|
||||
int counter = 0;
|
||||
for (TableId tableId : tableIds) {
|
||||
long start = clock.currentTimeInMillis();
|
||||
logger.debug("Step 8.{}: scanning table '{}'; {} tables remain",++counter,tableId,tableIds.size()-counter);
|
||||
sql.set("SELECT * FROM " + tableId);
|
||||
mysql.query(sql.get(), rs -> {
|
||||
RecordsForTable recordMaker = context.makeRecord().forTable(tableId, null, super::enqueueRecord);
|
||||
@ -300,16 +321,23 @@ protected void execute() {
|
||||
}
|
||||
});
|
||||
if ( interrupted.get() ) break;
|
||||
long stop = clock.currentTimeInMillis();
|
||||
logger.info("Step 8.{}: scanned table '{}' in {}",counter,tableId,Strings.duration(stop-start));
|
||||
}
|
||||
long stop = clock.currentTimeInMillis();
|
||||
logger.info("Step 8: scanned contents of {} tables in {}",tableIds.size(),Strings.duration(stop-startScan));
|
||||
|
||||
// ------
|
||||
// STEP 9
|
||||
// ------
|
||||
// Release the read lock if we have not yet done so ...
|
||||
if (!unlocked) {
|
||||
logger.info("Step 9: releasing global read lock to enable MySQL writes");
|
||||
sql.set("UNLOCK TABLES");
|
||||
mysql.execute(sql.get());
|
||||
unlocked = true;
|
||||
long lockReleased = clock.currentTimeInMillis();
|
||||
logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased-lockAcquired));
|
||||
}
|
||||
|
||||
// -------
|
||||
@ -317,15 +345,18 @@ protected void execute() {
|
||||
// -------
|
||||
if (interrupted.get()) {
|
||||
// We were interrupted while reading the tables, so roll back the transaction and return immediately ...
|
||||
logger.info("Step 10: rolling back transaction after request to stop");
|
||||
sql.set("ROLLBACK");
|
||||
mysql.execute(sql.get());
|
||||
return;
|
||||
}
|
||||
// Otherwise, commit our transaction
|
||||
logger.info("Step 10: committing transaction");
|
||||
sql.set("COMMIT");
|
||||
mysql.execute(sql.get());
|
||||
|
||||
try {
|
||||
logger.info("Step 11: recording completion of snapshot");
|
||||
// Mark the source as having completed the snapshot. Because of this, **subsequent** source records
|
||||
// produced by the connector (to any topic) will have a normal (not snapshot) offset ...
|
||||
source.completeSnapshot();
|
||||
@ -338,7 +369,8 @@ protected void execute() {
|
||||
} finally {
|
||||
// Set the completion flag ...
|
||||
super.completeSuccessfully();
|
||||
logger.info("Completed snapshot for MySQL server {}", context.serverName());
|
||||
stop = clock.currentTimeInMillis();
|
||||
logger.info("Completed snapshot in {}", Strings.duration(stop-ts));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
failed(e, "Aborting snapshot after running '" + sql.get() + "': " + e.getMessage());
|
||||
|
@ -2,7 +2,7 @@
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.Target=System.out
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n
|
||||
|
||||
# Root logger option
|
||||
log4j.rootLogger=INFO, stdout
|
||||
|
100
debezium-core/src/main/java/io/debezium/util/LoggingContext.java
Normal file
100
debezium-core/src/main/java/io/debezium/util/LoggingContext.java
Normal file
@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.util;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.MDC;
|
||||
|
||||
/**
|
||||
* A utility that provides a consistent set of properties for the Mapped Diagnostic Context (MDC) properties used by Debezium
|
||||
* components.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
* @since 0.2
|
||||
*/
|
||||
public class LoggingContext {
|
||||
|
||||
/**
|
||||
* The key for the connector type MDC property.
|
||||
*/
|
||||
public static final String CONNECTOR_TYPE = "dbz.connectorType";
|
||||
/**
|
||||
* The key for the connector logical name MDC property.
|
||||
*/
|
||||
public static final String CONNECTOR_NAME = "dbz.connectorName";
|
||||
/**
|
||||
* The key for the connector context name MDC property.
|
||||
*/
|
||||
public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";
|
||||
|
||||
private LoggingContext() {
|
||||
}
|
||||
|
||||
/**
|
||||
* A snapshot of an MDC context that can be {@link #restore()}.
|
||||
*/
|
||||
public static final class PreviousContext {
|
||||
private final Map<String,String> context;
|
||||
@SuppressWarnings("unchecked")
|
||||
protected PreviousContext() {
|
||||
context = MDC.getCopyOfContextMap();
|
||||
}
|
||||
/**
|
||||
* Restore this logging context.
|
||||
*/
|
||||
public void restore() {
|
||||
for ( Map.Entry<String, String> entry : context.entrySet() ) {
|
||||
MDC.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure for a connector the logger's Mapped Diagnostic Context (MDC) properties for the thread making this call.
|
||||
*
|
||||
* @param connectorType the type of connector; may not be null
|
||||
* @param connectorName the name of the connector; may not be null
|
||||
* @param contextName the name of the context; may not be null
|
||||
* @return the previous MDC context; never null
|
||||
* @throws IllegalArgumentException if any of the parameters are null
|
||||
*/
|
||||
public static PreviousContext forConnector(String connectorType, String connectorName, String contextName) {
|
||||
if (connectorType == null) throw new IllegalArgumentException("The MDC value for the connector type may not be null");
|
||||
if (connectorName == null) throw new IllegalArgumentException("The MDC value for the connector name may not be null");
|
||||
if (contextName == null) throw new IllegalArgumentException("The MDC value for the connector context may not be null");
|
||||
PreviousContext previous = new PreviousContext();
|
||||
MDC.put(CONNECTOR_TYPE, connectorType);
|
||||
MDC.put(CONNECTOR_NAME, connectorName);
|
||||
MDC.put(CONNECTOR_CONTEXT, contextName);
|
||||
return previous;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the supplied function in the temporary connector MDC context, and when complete always return the MDC context to its
|
||||
* state before this method was called.
|
||||
*
|
||||
* @param connectorType the type of connector; may not be null
|
||||
* @param connectorName the logical name of the connector; may not be null
|
||||
* @param contextName the name of the context; may not be null
|
||||
* @param operation the function to run in the new MDC context; may not be null
|
||||
* @throws IllegalArgumentException if any of the parameters are null
|
||||
*/
|
||||
public static void temporarilyForConnector(String connectorType, String connectorName, String contextName, Runnable operation) {
|
||||
if (connectorType == null) throw new IllegalArgumentException("The MDC value for the connector type may not be null");
|
||||
if (connectorName == null) throw new IllegalArgumentException("The MDC value for the connector name may not be null");
|
||||
if (contextName == null) throw new IllegalArgumentException("The MDC value for the connector context may not be null");
|
||||
if (operation == null) throw new IllegalArgumentException("The operation may not be null");
|
||||
PreviousContext previous = new PreviousContext();
|
||||
try {
|
||||
forConnector(connectorType,connectorName,contextName);
|
||||
operation.run();
|
||||
} finally {
|
||||
previous.restore();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -7,6 +7,8 @@
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.math.BigDecimal;
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
@ -36,11 +38,11 @@ public final class Strings {
|
||||
* @return the list of objects included in the list; never null
|
||||
*/
|
||||
public static <T> Set<T> listOf(String input, Function<String, String[]> splitter, Function<String, T> factory) {
|
||||
if ( input == null ) return Collections.emptySet();
|
||||
if (input == null) return Collections.emptySet();
|
||||
Set<T> matches = new HashSet<>();
|
||||
for (String item : splitter.apply(input)) {
|
||||
T obj = factory.apply(item);
|
||||
if ( obj != null ) matches.add(obj);
|
||||
if (obj != null) matches.add(obj);
|
||||
}
|
||||
return matches;
|
||||
}
|
||||
@ -54,7 +56,7 @@ public static <T> Set<T> listOf(String input, Function<String, String[]> splitte
|
||||
* @return the list of objects included in the list; never null
|
||||
*/
|
||||
public static <T> Set<T> listOf(String input, char delimiter, Function<String, T> factory) {
|
||||
return listOf(input,(str) -> str.split("[" + delimiter + "]"),factory);
|
||||
return listOf(input, (str) -> str.split("[" + delimiter + "]"), factory);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -65,7 +67,7 @@ public static <T> Set<T> listOf(String input, char delimiter, Function<String, T
|
||||
* @return the list of objects included in the list; never null
|
||||
*/
|
||||
public static <T> Set<T> listOf(String input, Function<String, T> factory) {
|
||||
return listOf(input,',',factory);
|
||||
return listOf(input, ',', factory);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -76,7 +78,7 @@ public static <T> Set<T> listOf(String input, Function<String, T> factory) {
|
||||
* @return the list of regular expression {@link Pattern}s included in the list; never null
|
||||
*/
|
||||
public static Set<Pattern> listOfRegex(String input) {
|
||||
return listOf(input,',',Pattern::compile);
|
||||
return listOf(input, ',', Pattern::compile);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -88,7 +90,7 @@ public static Set<Pattern> listOfRegex(String input) {
|
||||
* @return the list of regular expression {@link Pattern}s included in the list; never null
|
||||
*/
|
||||
public static Set<Pattern> listOfRegex(String input, int regexFlags) {
|
||||
return listOf(input,',',(str)->Pattern.compile(str,regexFlags));
|
||||
return listOf(input, ',', (str) -> Pattern.compile(str, regexFlags));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -111,7 +113,7 @@ public static interface CharacterPredicate {
|
||||
* @param content the string content that is to be split
|
||||
* @return the list of lines; never null but may be an empty (unmodifiable) list if the supplied content is null or empty
|
||||
*/
|
||||
public static List<String> splitLines( final String content ) {
|
||||
public static List<String> splitLines(final String content) {
|
||||
if (content == null || content.length() == 0) return Collections.emptyList();
|
||||
String[] lines = content.split("[\\r]?\\n");
|
||||
return Arrays.asList(lines);
|
||||
@ -410,8 +412,7 @@ public static int asInt(String value, int defaultValue) {
|
||||
if (value != null) {
|
||||
try {
|
||||
return Integer.parseInt(value);
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
} catch (NumberFormatException e) {}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
@ -427,8 +428,7 @@ public static long asLong(String value, long defaultValue) {
|
||||
if (value != null) {
|
||||
try {
|
||||
return Long.parseLong(value);
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
} catch (NumberFormatException e) {}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
@ -444,8 +444,7 @@ public static double asDouble(String value, double defaultValue) {
|
||||
if (value != null) {
|
||||
try {
|
||||
return Double.parseDouble(value);
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
} catch (NumberFormatException e) {}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
@ -461,12 +460,46 @@ public static boolean asBoolean(String value, boolean defaultValue) {
|
||||
if (value != null) {
|
||||
try {
|
||||
return Boolean.parseBoolean(value);
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
} catch (NumberFormatException e) {}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* For the given duration in milliseconds, obtain a readable representation of the form {@code HHH:MM:SS.mmm}, where
|
||||
* <dl>
|
||||
* <dt>HHH</dt>
|
||||
* <dd>is the number of hours written in at least 2 digits (e.g., "03")</dd>
|
||||
* <dt>MM</dt>
|
||||
* <dd>is the number of hours written in at least 2 digits (e.g., "05")</dd>
|
||||
* <dt>SS</dt>
|
||||
* <dd>is the number of hours written in at least 2 digits (e.g., "09")</dd>
|
||||
* <dt>mmm</dt>
|
||||
* <dd>is the fractional part of seconds, written with 1-3 digits (any trailing zeros are dropped)</dd>
|
||||
* </dl>
|
||||
*
|
||||
* @param durationInMillis the duration in milliseconds
|
||||
* @return the readable duration.
|
||||
*/
|
||||
public static String duration(long durationInMillis) {
|
||||
// Calculate how many seconds, and don't lose any information ...
|
||||
BigDecimal bigSeconds = new BigDecimal(Math.abs(durationInMillis)).divide(new BigDecimal(1000));
|
||||
// Calculate the minutes, and round to lose the seconds
|
||||
int minutes = bigSeconds.intValue() / 60;
|
||||
// Remove the minutes from the seconds, to just have the remainder of seconds
|
||||
double dMinutes = minutes;
|
||||
double seconds = bigSeconds.doubleValue() - dMinutes * 60;
|
||||
// Now compute the number of full hours, and change 'minutes' to hold the remaining minutes
|
||||
int hours = minutes / 60;
|
||||
minutes = minutes - (hours * 60);
|
||||
|
||||
// Format the string, and have at least 2 digits for the hours, minutes and whole seconds,
|
||||
// and between 3 and 6 digits for the fractional part of the seconds...
|
||||
String result = new DecimalFormat("######00").format(hours) + ':' + new DecimalFormat("00").format(minutes) + ':'
|
||||
+ new DecimalFormat("00.0##").format(seconds);
|
||||
return result;
|
||||
}
|
||||
|
||||
private Strings() {
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user