diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java index dda88aeca..8c10215b5 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java @@ -363,10 +363,7 @@ private void startSourceTasks(final List tasks) throws Excepti LOGGER.debug("Started task #{} out of {} tasks.", i + 1, nTasks); } catch (Exception e) { - LOGGER.debug("Task #{} (out of {} tasks) failed to start. Failed with {}", i + 1, nTasks, e.getMessage()); - if (LOGGER.isDebugEnabled()) { - e.printStackTrace(); - } + LOGGER.debug("Task #{} (out of {} tasks) failed to start. Failed with", i + 1, nTasks, e); // Store only the first error. if (error == null) { @@ -522,9 +519,6 @@ private void stopSourceTasks(final List tasks) { } catch (Exception e) { LOGGER.warn("Failure during stopping tasks, stopping them immediately. Failed with ", e); - if (LOGGER.isDebugEnabled()) { - e.printStackTrace(); - } } finally { // Make sure task service is shut down and no other tasks can be run. @@ -562,9 +556,6 @@ private void callCompletionHandler(final Throwable error) { } else { final Throwable realError = error instanceof ExecutionException ? error.getCause() : error; - if (LOGGER.isDebugEnabled()) { - error.printStackTrace(); - } completionCallback.handle(false, error.getMessage(), realError); } } @@ -673,7 +664,7 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin throws InterruptedException, TimeoutException { final long timeout = clock.currentTimeInMillis() + commitTimeout; if (!offsetWriter.beginFlush(commitTimeout, TimeUnit.MICROSECONDS)) { - LOGGER.debug("No offset to be committed."); + LOGGER.trace("No offset to be committed."); return false; } @@ -1026,14 +1017,14 @@ private static class PollRecords extends RetryingCallable { @Override public Void doCall() throws Exception { while (engineState.get() == State.POLLING_TASKS) { - LOGGER.debug("Thread {} running task {} starts polling for records.", Thread.currentThread().getName(), task.connectTask()); + LOGGER.trace("Thread {} running task {} starts polling for records.", Thread.currentThread().getName(), task.connectTask()); final List changeRecords = task.connectTask().poll(); // blocks until there are values ... - LOGGER.debug("Thread {} polled {} records.", Thread.currentThread().getName(), changeRecords == null ? "no" : changeRecords.size()); + LOGGER.trace("Thread {} polled {} records.", Thread.currentThread().getName(), changeRecords == null ? "no" : changeRecords.size()); if (changeRecords != null && !changeRecords.isEmpty()) { processor.processRecords(changeRecords); } else { - LOGGER.debug("No records."); + LOGGER.trace("No records."); } } return null; @@ -1155,7 +1146,7 @@ public void processRecords(final List records) throws Exception { final List> recordFutures = new ArrayList<>(records.size()); records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformRecord(r, transformations)))); - LOGGER.debug("Thread {} is getting source records.", Thread.currentThread().getName()); + LOGGER.trace("Thread {} is getting source records.", Thread.currentThread().getName()); final List transformedRecords = new ArrayList<>(recordFutures.size()); for (Future f : recordFutures) { SourceRecord record = f.get(); // we need the whole batch, eventually wait forever @@ -1164,7 +1155,7 @@ public void processRecords(final List records) throws Exception { } } - LOGGER.debug("Calling user handler."); + LOGGER.trace("Calling user handler."); userHandler.handleBatch(transformedRecords, committer); } } @@ -1184,11 +1175,11 @@ private class ParallelSmtAndConvertBatchProcessor extends AbstractRecordProcesso @Override public void processRecords(final List records) throws Exception { - LOGGER.debug("Submitting {} records for processing.", records.size()); + LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size()); final List> recordFutures = new ArrayList<>(records.size()); records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformAndConvertRecord(r, transformations, convertor)))); - LOGGER.debug("Getting source records."); + LOGGER.trace("Getting source records."); final List convertedRecords = new ArrayList<>(recordFutures.size()); for (Future f : recordFutures) { R record = f.get(); // we need the whole batch, eventually wait forever @@ -1197,7 +1188,7 @@ public void processRecords(final List records) throws Exception { } } - LOGGER.debug("Calling user handler."); + LOGGER.trace("Calling user handler."); userHandler.handleBatch(convertedRecords, committer); } } @@ -1216,23 +1207,23 @@ private class ParallelSmtConsumerProcessor extends AbstractRecordProcessor { @Override public void processRecords(final List records) throws Exception { - LOGGER.debug("Submitting {} records for processing.", records.size()); + LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size()); final List> recordFutures = new ArrayList<>(records.size()); records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformRecord(r, transformations)))); - LOGGER.debug("Waiting for the batch to finish processing."); + LOGGER.trace("Waiting for the batch to finish processing."); final List transformedRecords = new ArrayList<>(recordFutures.size()); for (Future f : recordFutures) { transformedRecords.add(f.get()); // we need the whole batch, eventually wait forever } - LOGGER.debug("Calling user consumer."); + LOGGER.trace("Calling user consumer."); for (int i = 0; i < records.size(); i++) { consumer.accept(transformedRecords.get(i)); committer.markProcessed(records.get(i)); } - LOGGER.debug("Marking batch as finished."); + LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } } @@ -1253,23 +1244,23 @@ private class ParallelSmtAndConvertConsumerProcessor extends AbstractRecordProce @Override public void processRecords(final List records) throws Exception { - LOGGER.debug("Submitting {} records for processing.", records.size()); + LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size()); final List> recordFutures = new ArrayList<>(records.size()); records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformAndConvertRecord(r, transformations, convertor)))); - LOGGER.debug("Waiting for the batch to finish processing."); + LOGGER.trace("Waiting for the batch to finish processing."); final List convertedRecords = new ArrayList<>(recordFutures.size()); for (Future f : recordFutures) { convertedRecords.add(f.get()); // we need the whole batch, eventually wait forever } - LOGGER.debug("Calling user consumer."); + LOGGER.trace("Calling user consumer."); for (int i = 0; i < records.size(); i++) { consumer.accept(convertedRecords.get(i)); committer.markProcessed(records.get(i)); } - LOGGER.debug("Marking batch as finished."); + LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } } @@ -1288,17 +1279,17 @@ private class ParallelSmtAsyncConsumerProcessor extends AbstractRecordProcessor @Override public void processRecords(final List records) throws Exception { - LOGGER.debug("Submitting {} records for processing.", records.size()); + LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size()); final List> recordFutures = new ArrayList<>(records.size()); records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformAndConsumeRecord(r, transformations, consumer)))); - LOGGER.debug("Waiting for the batch to finish processing."); + LOGGER.trace("Waiting for the batch to finish processing."); for (int i = 0; i < records.size(); i++) { recordFutures.get(i); committer.markProcessed(records.get(i)); } - LOGGER.debug("Marking batch as finished."); + LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } } @@ -1319,17 +1310,17 @@ private class ParallelSmtAndConvertAsyncConsumerProcessor extends AbstractRecord @Override public void processRecords(final List records) throws Exception { - LOGGER.debug("Submitting {} records for processing.", records.size()); + LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size()); final List> recordFutures = new ArrayList<>(records.size()); records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformConvertConsumeRecord<>(r, transformations, convertor, consumer)))); - LOGGER.debug("Waiting for the batch to finish processing."); + LOGGER.trace("Waiting for the batch to finish processing."); for (int i = 0; i < records.size(); i++) { recordFutures.get(i); committer.markProcessed(records.get(i)); } - LOGGER.debug("Marking batch as finished."); + LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } }