DBZ-1464 Misc. clean-up;

* logger -> LOGGER
* making op literals map unmodifiable
* Adding Lev Zemlyanov to COPYRIGHT.txt
This commit is contained in:
Gunnar Morling 2019-09-06 11:01:08 +02:00
parent 8cbb8eb63b
commit 1e0859588f
3 changed files with 62 additions and 50 deletions

View File

@ -64,6 +64,7 @@ Keith Barber
Kevin Pullin
Krizhan Mariampillai
Leo Mei
Lev Zemlyanov
Listman Gamboa
Liu Hanlin
Maciej Bryński

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mongodb;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@ -43,11 +44,18 @@
public class RecordMakers {
private static final ObjectSerializer jsonSerializer = JSONSerializers.getStrict();
private static final Map<String, Operation> operationLiterals = new HashMap<>();
@ThreadSafe
private static final Map<String, Operation> OPERATION_LITERALS;
static {
operationLiterals.put("i", Operation.CREATE);
operationLiterals.put("u", Operation.UPDATE);
operationLiterals.put("d", Operation.DELETE);
Map<String, Operation> literals = new HashMap<>();
literals.put("i", Operation.CREATE);
literals.put("u", Operation.UPDATE);
literals.put("d", Operation.DELETE);
OPERATION_LITERALS = Collections.unmodifiableMap(literals);
}
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -95,7 +103,7 @@ public RecordsForCollection forCollection(CollectionId collectionId) {
}
public static boolean isValidOperation(String operation) {
return operationLiterals.containsKey(operation);
return OPERATION_LITERALS.containsKey(operation);
}
/**
@ -185,7 +193,7 @@ public int recordEvent(Document oplogEvent, long timestamp) throws InterruptedEx
Object o2 = oplogEvent.get("o2");
String objId = o2 != null ? idObjToJson(o2) : idObjToJson(patchObj);
assert objId != null;
Operation operation = operationLiterals.get(oplogEvent.getString("op"));
Operation operation = OPERATION_LITERALS.get(oplogEvent.getString("op"));
return createRecords(sourceValue, offset, operation, objId, patchObj, timestamp);
}

View File

@ -94,7 +94,7 @@
@ThreadSafe
public class Replicator {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
@ -166,7 +166,7 @@ public void run() {
}
}
catch (Throwable t) {
logger.error("Replicator for replica set {} failed", rsName, t);
LOGGER.error("Replicator for replica set {} failed", rsName, t);
onFailure.accept(t);
}
finally {
@ -184,7 +184,7 @@ public void run() {
* @return {@code true} if a connection was established, or {@code false} otherwise
*/
protected boolean establishConnectionToPrimary() {
logger.info("Connecting to '{}'", replicaSet);
LOGGER.info("Connecting to '{}'", replicaSet);
primaryClient = context.getConnectionContext().primaryFor(
replicaSet,
context.filters(),
@ -194,7 +194,7 @@ protected boolean establishConnectionToPrimary() {
throw new ConnectException("Error while attempting to " + desc, error);
}
else {
logger.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
}
});
@ -222,17 +222,17 @@ protected void recordCurrentOplogPosition() {
protected boolean isInitialSyncExpected() {
boolean performSnapshot = true;
if (source.hasOffset(rsName)) {
if (logger.isInfoEnabled()) {
logger.info("Found existing offset for replica set '{}' at {}", rsName, source.lastOffset(rsName));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Found existing offset for replica set '{}' at {}", rsName, source.lastOffset(rsName));
}
performSnapshot = false;
if (context.getConnectionContext().performSnapshotEvenIfNotNeeded()) {
logger.info("Configured to performing initial sync of replica set '{}'", rsName);
LOGGER.info("Configured to performing initial sync of replica set '{}'", rsName);
performSnapshot = true;
} else {
if (source.isInitialSyncOngoing(rsName)) {
// The last snapshot was not completed, so do it again ...
logger.info("The previous initial sync was incomplete for '{}', so initiating another initial sync", rsName);
LOGGER.info("The previous initial sync was incomplete for '{}', so initiating another initial sync", rsName);
performSnapshot = true;
} else {
// There is no ongoing initial sync, so look to see if our last recorded offset still exists in the oplog.
@ -245,23 +245,23 @@ protected boolean isInitialSyncExpected() {
});
if (firstAvailableTs == null) {
logger.info("The oplog contains no entries, so performing initial sync of replica set '{}'", rsName);
LOGGER.info("The oplog contains no entries, so performing initial sync of replica set '{}'", rsName);
performSnapshot = true;
} else if (lastRecordedTs.compareTo(firstAvailableTs) < 0) {
// The last recorded timestamp is *before* the first existing oplog event, which means there is
// almost certainly some history lost since we last processed the oplog ...
logger.info("Initial sync is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}",
LOGGER.info("Initial sync is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}",
rsName, firstAvailableTs, lastRecordedTs);
performSnapshot = true;
} else {
// Otherwise we'll not perform an initial sync
logger.info("The oplog contains the last entry previously read for '{}', so no initial sync will be performed",
LOGGER.info("The oplog contains the last entry previously read for '{}', so no initial sync will be performed",
rsName);
}
}
}
} else {
logger.info("No existing offset found for replica set '{}', starting initial sync", rsName);
LOGGER.info("No existing offset found for replica set '{}', starting initial sync", rsName);
performSnapshot = true;
}
return performSnapshot;
@ -277,12 +277,12 @@ protected boolean performInitialSync() {
delaySnapshotIfNeeded();
}
catch (InterruptedException e) {
logger.info("Interrupted while awaiting initial snapshot delay");
LOGGER.info("Interrupted while awaiting initial snapshot delay");
return false;
}
if (logger.isInfoEnabled()) {
logger.info("Beginning initial sync of '{}' at {}", rsName, source.lastOffset(rsName));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Beginning initial sync of '{}' at {}", rsName, source.lastOffset(rsName));
}
source.startInitialSync(replicaSet.replicaSetName());
@ -291,7 +291,7 @@ protected boolean performInitialSync() {
bufferedRecorder.startBuffering();
} catch (InterruptedException e) {
// Do nothing so that this thread is terminated ...
logger.info("Interrupted while waiting to flush the buffer before starting an initial sync of '{}'", rsName);
LOGGER.info("Interrupted while waiting to flush the buffer before starting an initial sync of '{}'", rsName);
return false;
}
@ -309,8 +309,8 @@ protected boolean performInitialSync() {
final AtomicLong numDocumentsCopied = new AtomicLong();
// And start threads to pull collection IDs from the queue and perform the copies ...
if (logger.isInfoEnabled()) {
logger.info("Preparing to use {} thread(s) to sync {} collection(s): {}",
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Preparing to use {} thread(s) to sync {} collection(s): {}",
numThreads, collections.size(), Strings.join(", ", collections));
}
for (int i = 0; i != numThreads; ++i) {
@ -321,13 +321,13 @@ protected boolean performInitialSync() {
CollectionId id = null;
while (!aborted.get() && (id = collectionsToCopy.poll()) != null) {
long start = clock.currentTimeInMillis();
logger.info("Starting initial sync of '{}'", id);
LOGGER.info("Starting initial sync of '{}'", id);
long numDocs = copyCollection(id, syncStart);
numCollectionsCopied.incrementAndGet();
numDocumentsCopied.addAndGet(numDocs);
if (logger.isInfoEnabled()) {
if (LOGGER.isInfoEnabled()) {
long duration = clock.currentTimeInMillis() - start;
logger.info("Completing initial sync of {} documents from '{}' in {}", numDocs, id, Strings.duration(duration));
LOGGER.info("Completing initial sync of {} documents from '{}' in {}", numDocs, id, Strings.duration(duration));
}
}
} catch (InterruptedException e) {
@ -352,9 +352,9 @@ protected boolean performInitialSync() {
// Therefore, check the aborted state here ...
long syncDuration = clock.currentTimeInMillis() - syncStart;
if (aborted.get()) {
if (logger.isInfoEnabled()) {
if (LOGGER.isInfoEnabled()) {
int remaining = collections.size() - numCollectionsCopied.get();
logger.info("Initial sync aborted after {} with {} of {} collections incomplete",
LOGGER.info("Initial sync aborted after {} with {} of {} collections incomplete",
Strings.duration(syncDuration), remaining, collections.size());
}
return false;
@ -366,16 +366,16 @@ protected boolean performInitialSync() {
// And immediately flush the last buffered source record with the updated offset ...
bufferedRecorder.stopBuffering(source.lastOffset(rsName));
} catch (InterruptedException e) {
logger.info("Interrupted while waiting for last initial sync record from replica set '{}' to be recorded", rsName);
LOGGER.info("Interrupted while waiting for last initial sync record from replica set '{}' to be recorded", rsName);
return false;
}
if (collections.isEmpty()) {
logger.warn("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration");
LOGGER.warn("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration");
}
if (logger.isInfoEnabled()) {
logger.info("Initial sync of {} collections with a total of {} documents completed in {}",
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initial sync of {} collections with a total of {} documents completed in {}",
collections.size(), numDocumentsCopied.get(), Strings.duration(syncDuration));
}
return true;
@ -395,7 +395,7 @@ private void delaySnapshotIfNeeded() throws InterruptedException {
throw new InterruptedException("Interrupted while awaiting initial snapshot delay");
}
logger.info("The connector will wait for {}s before proceeding", timer.remaining().getSeconds());
LOGGER.info("The connector will wait for {}s before proceeding", timer.remaining().getSeconds());
metronome.pause();
}
}
@ -434,7 +434,7 @@ protected long copyCollection(MongoClient primary, CollectionId collectionId, lo
try (MongoCursor<Document> cursor = docCollection.find().batchSize(batchSize).iterator()) {
while (running.get() && cursor.hasNext()) {
Document doc = cursor.next();
logger.trace("Found existing doc in {}: {}", collectionId, doc);
LOGGER.trace("Found existing doc in {}: {}", collectionId, doc);
counter += factory.recordObject(collectionId, doc, timestamp);
}
}
@ -459,7 +459,7 @@ protected void readOplog() {
protected void readOplog(MongoClient primary) {
BsonTimestamp oplogStart = source.lastOffsetTimestamp(replicaSet.replicaSetName());
ServerAddress primaryAddress = primary.getAddress();
logger.info("Reading oplog for '{}' primary {} starting at {}", replicaSet, primaryAddress, oplogStart);
LOGGER.info("Reading oplog for '{}' primary {} starting at {}", replicaSet, primaryAddress, oplogStart);
// Include none of the cluster-internal operations and only those events since the previous timestamp ...
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs");
@ -488,7 +488,7 @@ protected void readOplog(MongoClient primary) {
bufferedRecorder);
}
catch (InterruptedException e) {
logger.info("Replicator thread is interrupted");
LOGGER.info("Replicator thread is interrupted");
Thread.currentThread().interrupt();
return;
}
@ -505,12 +505,12 @@ protected void readOplog(MongoClient primary) {
* processing events
*/
protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) {
logger.debug("Found event: {}", event);
LOGGER.debug("Found event: {}", event);
String ns = event.getString("ns");
Document object = event.get("o", Document.class);
if (object == null) {
if (logger.isWarnEnabled()) {
logger.warn("Missing 'o' field in event, so skipping {}", event.toJson());
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Missing 'o' field in event, so skipping {}", event.toJson());
}
return true;
}
@ -525,30 +525,33 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
address.set(currentPrimary);
});
} catch (InterruptedException e) {
logger.error("Get current primary executeBlocking", e);
LOGGER.error("Get current primary executeBlocking", e);
}
ServerAddress serverAddress = address.get();
//primary switch will be handled automatically by MongoDB driver
if (serverAddress != null && !serverAddress.equals(primaryAddress)) {
logger.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}",
LOGGER.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}",
primaryAddress, serverAddress);
} else {
logger.info("Found new primary event in oplog, current {} is new primary. " +
LOGGER.info("Found new primary event in oplog, current {} is new primary. " +
"Continue to process oplog event.", primaryAddress);
}
}
// Otherwise, ignore this event ...
if (logger.isDebugEnabled()) {
logger.debug("Skipping event with no namespace: {}", event.toJson());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Skipping event with no namespace: {}", event.toJson());
}
return true;
}
if (!RecordMakers.isValidOperation(event.getString("op"))) {
// the op is not insert/update/delete
logger.debug("Skipping event with \"op={}\"", event.getString("op"));
String operation = event.getString("op");
// the op is not insert/update/delete
if (!RecordMakers.isValidOperation(operation)) {
LOGGER.debug("Skipping event with \"op={}\"", operation);
return true;
}
int delimIndex = ns.indexOf('.');
if (delimIndex > 0) {
assert (delimIndex + 1) < ns.length();
@ -557,12 +560,12 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
if ("$cmd".equals(collectionName)) {
// This is a command on a database ...
// TODO: Probably want to handle some of these when we track creation/removal of collections
logger.debug("Skipping database command event: {}", event.toJson());
LOGGER.debug("Skipping database command event: {}", event.toJson());
return true;
}
// Otherwise, it is an event on a document in a collection ...
if (!context.filters().databaseFilter().test(dbName)) {
logger.debug("Skipping the event for database {} based on database.whitelist", dbName);
LOGGER.debug("Skipping the event for database {} based on database.whitelist", dbName);
return true;
}
CollectionId collectionId = new CollectionId(rsName, dbName, collectionName);