diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index e2afaae80..11d630aa8 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -64,6 +64,7 @@ Keith Barber Kevin Pullin Krizhan Mariampillai Leo Mei +Lev Zemlyanov Listman Gamboa Liu Hanlin Maciej BryƄski diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java index f98171401..6d6084497 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.mongodb; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -43,11 +44,18 @@ public class RecordMakers { private static final ObjectSerializer jsonSerializer = JSONSerializers.getStrict(); - private static final Map operationLiterals = new HashMap<>(); + + @ThreadSafe + private static final Map OPERATION_LITERALS; + static { - operationLiterals.put("i", Operation.CREATE); - operationLiterals.put("u", Operation.UPDATE); - operationLiterals.put("d", Operation.DELETE); + Map literals = new HashMap<>(); + + literals.put("i", Operation.CREATE); + literals.put("u", Operation.UPDATE); + literals.put("d", Operation.DELETE); + + OPERATION_LITERALS = Collections.unmodifiableMap(literals); } private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -95,7 +103,7 @@ public RecordsForCollection forCollection(CollectionId collectionId) { } public static boolean isValidOperation(String operation) { - return operationLiterals.containsKey(operation); + return OPERATION_LITERALS.containsKey(operation); } /** @@ -185,7 +193,7 @@ public int recordEvent(Document oplogEvent, long timestamp) throws InterruptedEx Object o2 = oplogEvent.get("o2"); String objId = o2 != null ? idObjToJson(o2) : idObjToJson(patchObj); assert objId != null; - Operation operation = operationLiterals.get(oplogEvent.getString("op")); + Operation operation = OPERATION_LITERALS.get(oplogEvent.getString("op")); return createRecords(sourceValue, offset, operation, objId, patchObj, timestamp); } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java index 9e5909e85..e1089c790 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java @@ -94,7 +94,7 @@ @ThreadSafe public class Replicator { - private final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13"; @@ -166,7 +166,7 @@ public void run() { } } catch (Throwable t) { - logger.error("Replicator for replica set {} failed", rsName, t); + LOGGER.error("Replicator for replica set {} failed", rsName, t); onFailure.accept(t); } finally { @@ -184,7 +184,7 @@ public void run() { * @return {@code true} if a connection was established, or {@code false} otherwise */ protected boolean establishConnectionToPrimary() { - logger.info("Connecting to '{}'", replicaSet); + LOGGER.info("Connecting to '{}'", replicaSet); primaryClient = context.getConnectionContext().primaryFor( replicaSet, context.filters(), @@ -194,7 +194,7 @@ protected boolean establishConnectionToPrimary() { throw new ConnectException("Error while attempting to " + desc, error); } else { - logger.error("Error while attempting to {}: {}", desc, error.getMessage(), error); + LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error); } }); @@ -222,17 +222,17 @@ protected void recordCurrentOplogPosition() { protected boolean isInitialSyncExpected() { boolean performSnapshot = true; if (source.hasOffset(rsName)) { - if (logger.isInfoEnabled()) { - logger.info("Found existing offset for replica set '{}' at {}", rsName, source.lastOffset(rsName)); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Found existing offset for replica set '{}' at {}", rsName, source.lastOffset(rsName)); } performSnapshot = false; if (context.getConnectionContext().performSnapshotEvenIfNotNeeded()) { - logger.info("Configured to performing initial sync of replica set '{}'", rsName); + LOGGER.info("Configured to performing initial sync of replica set '{}'", rsName); performSnapshot = true; } else { if (source.isInitialSyncOngoing(rsName)) { // The last snapshot was not completed, so do it again ... - logger.info("The previous initial sync was incomplete for '{}', so initiating another initial sync", rsName); + LOGGER.info("The previous initial sync was incomplete for '{}', so initiating another initial sync", rsName); performSnapshot = true; } else { // There is no ongoing initial sync, so look to see if our last recorded offset still exists in the oplog. @@ -245,23 +245,23 @@ protected boolean isInitialSyncExpected() { }); if (firstAvailableTs == null) { - logger.info("The oplog contains no entries, so performing initial sync of replica set '{}'", rsName); + LOGGER.info("The oplog contains no entries, so performing initial sync of replica set '{}'", rsName); performSnapshot = true; } else if (lastRecordedTs.compareTo(firstAvailableTs) < 0) { // The last recorded timestamp is *before* the first existing oplog event, which means there is // almost certainly some history lost since we last processed the oplog ... - logger.info("Initial sync is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}", + LOGGER.info("Initial sync is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}", rsName, firstAvailableTs, lastRecordedTs); performSnapshot = true; } else { // Otherwise we'll not perform an initial sync - logger.info("The oplog contains the last entry previously read for '{}', so no initial sync will be performed", + LOGGER.info("The oplog contains the last entry previously read for '{}', so no initial sync will be performed", rsName); } } } } else { - logger.info("No existing offset found for replica set '{}', starting initial sync", rsName); + LOGGER.info("No existing offset found for replica set '{}', starting initial sync", rsName); performSnapshot = true; } return performSnapshot; @@ -277,12 +277,12 @@ protected boolean performInitialSync() { delaySnapshotIfNeeded(); } catch (InterruptedException e) { - logger.info("Interrupted while awaiting initial snapshot delay"); + LOGGER.info("Interrupted while awaiting initial snapshot delay"); return false; } - if (logger.isInfoEnabled()) { - logger.info("Beginning initial sync of '{}' at {}", rsName, source.lastOffset(rsName)); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Beginning initial sync of '{}' at {}", rsName, source.lastOffset(rsName)); } source.startInitialSync(replicaSet.replicaSetName()); @@ -291,7 +291,7 @@ protected boolean performInitialSync() { bufferedRecorder.startBuffering(); } catch (InterruptedException e) { // Do nothing so that this thread is terminated ... - logger.info("Interrupted while waiting to flush the buffer before starting an initial sync of '{}'", rsName); + LOGGER.info("Interrupted while waiting to flush the buffer before starting an initial sync of '{}'", rsName); return false; } @@ -309,8 +309,8 @@ protected boolean performInitialSync() { final AtomicLong numDocumentsCopied = new AtomicLong(); // And start threads to pull collection IDs from the queue and perform the copies ... - if (logger.isInfoEnabled()) { - logger.info("Preparing to use {} thread(s) to sync {} collection(s): {}", + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Preparing to use {} thread(s) to sync {} collection(s): {}", numThreads, collections.size(), Strings.join(", ", collections)); } for (int i = 0; i != numThreads; ++i) { @@ -321,13 +321,13 @@ protected boolean performInitialSync() { CollectionId id = null; while (!aborted.get() && (id = collectionsToCopy.poll()) != null) { long start = clock.currentTimeInMillis(); - logger.info("Starting initial sync of '{}'", id); + LOGGER.info("Starting initial sync of '{}'", id); long numDocs = copyCollection(id, syncStart); numCollectionsCopied.incrementAndGet(); numDocumentsCopied.addAndGet(numDocs); - if (logger.isInfoEnabled()) { + if (LOGGER.isInfoEnabled()) { long duration = clock.currentTimeInMillis() - start; - logger.info("Completing initial sync of {} documents from '{}' in {}", numDocs, id, Strings.duration(duration)); + LOGGER.info("Completing initial sync of {} documents from '{}' in {}", numDocs, id, Strings.duration(duration)); } } } catch (InterruptedException e) { @@ -352,9 +352,9 @@ protected boolean performInitialSync() { // Therefore, check the aborted state here ... long syncDuration = clock.currentTimeInMillis() - syncStart; if (aborted.get()) { - if (logger.isInfoEnabled()) { + if (LOGGER.isInfoEnabled()) { int remaining = collections.size() - numCollectionsCopied.get(); - logger.info("Initial sync aborted after {} with {} of {} collections incomplete", + LOGGER.info("Initial sync aborted after {} with {} of {} collections incomplete", Strings.duration(syncDuration), remaining, collections.size()); } return false; @@ -366,16 +366,16 @@ protected boolean performInitialSync() { // And immediately flush the last buffered source record with the updated offset ... bufferedRecorder.stopBuffering(source.lastOffset(rsName)); } catch (InterruptedException e) { - logger.info("Interrupted while waiting for last initial sync record from replica set '{}' to be recorded", rsName); + LOGGER.info("Interrupted while waiting for last initial sync record from replica set '{}' to be recorded", rsName); return false; } if (collections.isEmpty()) { - logger.warn("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration"); + LOGGER.warn("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration"); } - if (logger.isInfoEnabled()) { - logger.info("Initial sync of {} collections with a total of {} documents completed in {}", + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Initial sync of {} collections with a total of {} documents completed in {}", collections.size(), numDocumentsCopied.get(), Strings.duration(syncDuration)); } return true; @@ -395,7 +395,7 @@ private void delaySnapshotIfNeeded() throws InterruptedException { throw new InterruptedException("Interrupted while awaiting initial snapshot delay"); } - logger.info("The connector will wait for {}s before proceeding", timer.remaining().getSeconds()); + LOGGER.info("The connector will wait for {}s before proceeding", timer.remaining().getSeconds()); metronome.pause(); } } @@ -434,7 +434,7 @@ protected long copyCollection(MongoClient primary, CollectionId collectionId, lo try (MongoCursor cursor = docCollection.find().batchSize(batchSize).iterator()) { while (running.get() && cursor.hasNext()) { Document doc = cursor.next(); - logger.trace("Found existing doc in {}: {}", collectionId, doc); + LOGGER.trace("Found existing doc in {}: {}", collectionId, doc); counter += factory.recordObject(collectionId, doc, timestamp); } } @@ -459,7 +459,7 @@ protected void readOplog() { protected void readOplog(MongoClient primary) { BsonTimestamp oplogStart = source.lastOffsetTimestamp(replicaSet.replicaSetName()); ServerAddress primaryAddress = primary.getAddress(); - logger.info("Reading oplog for '{}' primary {} starting at {}", replicaSet, primaryAddress, oplogStart); + LOGGER.info("Reading oplog for '{}' primary {} starting at {}", replicaSet, primaryAddress, oplogStart); // Include none of the cluster-internal operations and only those events since the previous timestamp ... MongoCollection oplog = primary.getDatabase("local").getCollection("oplog.rs"); @@ -488,7 +488,7 @@ protected void readOplog(MongoClient primary) { bufferedRecorder); } catch (InterruptedException e) { - logger.info("Replicator thread is interrupted"); + LOGGER.info("Replicator thread is interrupted"); Thread.currentThread().interrupt(); return; } @@ -505,12 +505,12 @@ protected void readOplog(MongoClient primary) { * processing events */ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) { - logger.debug("Found event: {}", event); + LOGGER.debug("Found event: {}", event); String ns = event.getString("ns"); Document object = event.get("o", Document.class); if (object == null) { - if (logger.isWarnEnabled()) { - logger.warn("Missing 'o' field in event, so skipping {}", event.toJson()); + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Missing 'o' field in event, so skipping {}", event.toJson()); } return true; } @@ -525,30 +525,33 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) address.set(currentPrimary); }); } catch (InterruptedException e) { - logger.error("Get current primary executeBlocking", e); + LOGGER.error("Get current primary executeBlocking", e); } ServerAddress serverAddress = address.get(); //primary switch will be handled automatically by MongoDB driver if (serverAddress != null && !serverAddress.equals(primaryAddress)) { - logger.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}", + LOGGER.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}", primaryAddress, serverAddress); } else { - logger.info("Found new primary event in oplog, current {} is new primary. " + + LOGGER.info("Found new primary event in oplog, current {} is new primary. " + "Continue to process oplog event.", primaryAddress); } } // Otherwise, ignore this event ... - if (logger.isDebugEnabled()) { - logger.debug("Skipping event with no namespace: {}", event.toJson()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Skipping event with no namespace: {}", event.toJson()); } return true; } - if (!RecordMakers.isValidOperation(event.getString("op"))) { - // the op is not insert/update/delete - logger.debug("Skipping event with \"op={}\"", event.getString("op")); + + String operation = event.getString("op"); + // the op is not insert/update/delete + if (!RecordMakers.isValidOperation(operation)) { + LOGGER.debug("Skipping event with \"op={}\"", operation); return true; } + int delimIndex = ns.indexOf('.'); if (delimIndex > 0) { assert (delimIndex + 1) < ns.length(); @@ -557,12 +560,12 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) if ("$cmd".equals(collectionName)) { // This is a command on a database ... // TODO: Probably want to handle some of these when we track creation/removal of collections - logger.debug("Skipping database command event: {}", event.toJson()); + LOGGER.debug("Skipping database command event: {}", event.toJson()); return true; } // Otherwise, it is an event on a document in a collection ... if (!context.filters().databaseFilter().test(dbName)) { - logger.debug("Skipping the event for database {} based on database.whitelist", dbName); + LOGGER.debug("Skipping the event for database {} based on database.whitelist", dbName); return true; } CollectionId collectionId = new CollectionId(rsName, dbName, collectionName);