DBZ-7024 Improve log level and log messges

This commit is contained in:
Vojtech Juranek 2024-02-08 00:17:09 +01:00 committed by Jiri Pechanec
parent a04dc84b3e
commit cdf5e0255a

View File

@ -363,10 +363,7 @@ private void startSourceTasks(final List<EngineSourceTask> tasks) throws Excepti
LOGGER.debug("Started task #{} out of {} tasks.", i + 1, nTasks);
}
catch (Exception e) {
LOGGER.debug("Task #{} (out of {} tasks) failed to start. Failed with {}", i + 1, nTasks, e.getMessage());
if (LOGGER.isDebugEnabled()) {
e.printStackTrace();
}
LOGGER.debug("Task #{} (out of {} tasks) failed to start. Failed with", i + 1, nTasks, e);
// Store only the first error.
if (error == null) {
@ -522,9 +519,6 @@ private void stopSourceTasks(final List<EngineSourceTask> tasks) {
}
catch (Exception e) {
LOGGER.warn("Failure during stopping tasks, stopping them immediately. Failed with ", e);
if (LOGGER.isDebugEnabled()) {
e.printStackTrace();
}
}
finally {
// Make sure task service is shut down and no other tasks can be run.
@ -562,9 +556,6 @@ private void callCompletionHandler(final Throwable error) {
}
else {
final Throwable realError = error instanceof ExecutionException ? error.getCause() : error;
if (LOGGER.isDebugEnabled()) {
error.printStackTrace();
}
completionCallback.handle(false, error.getMessage(), realError);
}
}
@ -673,7 +664,7 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin
throws InterruptedException, TimeoutException {
final long timeout = clock.currentTimeInMillis() + commitTimeout;
if (!offsetWriter.beginFlush(commitTimeout, TimeUnit.MICROSECONDS)) {
LOGGER.debug("No offset to be committed.");
LOGGER.trace("No offset to be committed.");
return false;
}
@ -1026,14 +1017,14 @@ private static class PollRecords extends RetryingCallable<Void> {
@Override
public Void doCall() throws Exception {
while (engineState.get() == State.POLLING_TASKS) {
LOGGER.debug("Thread {} running task {} starts polling for records.", Thread.currentThread().getName(), task.connectTask());
LOGGER.trace("Thread {} running task {} starts polling for records.", Thread.currentThread().getName(), task.connectTask());
final List<SourceRecord> changeRecords = task.connectTask().poll(); // blocks until there are values ...
LOGGER.debug("Thread {} polled {} records.", Thread.currentThread().getName(), changeRecords == null ? "no" : changeRecords.size());
LOGGER.trace("Thread {} polled {} records.", Thread.currentThread().getName(), changeRecords == null ? "no" : changeRecords.size());
if (changeRecords != null && !changeRecords.isEmpty()) {
processor.processRecords(changeRecords);
}
else {
LOGGER.debug("No records.");
LOGGER.trace("No records.");
}
}
return null;
@ -1155,7 +1146,7 @@ public void processRecords(final List<SourceRecord> records) throws Exception {
final List<Future<SourceRecord>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformRecord(r, transformations))));
LOGGER.debug("Thread {} is getting source records.", Thread.currentThread().getName());
LOGGER.trace("Thread {} is getting source records.", Thread.currentThread().getName());
final List<SourceRecord> transformedRecords = new ArrayList<>(recordFutures.size());
for (Future<SourceRecord> f : recordFutures) {
SourceRecord record = f.get(); // we need the whole batch, eventually wait forever
@ -1164,7 +1155,7 @@ public void processRecords(final List<SourceRecord> records) throws Exception {
}
}
LOGGER.debug("Calling user handler.");
LOGGER.trace("Calling user handler.");
userHandler.handleBatch(transformedRecords, committer);
}
}
@ -1184,11 +1175,11 @@ private class ParallelSmtAndConvertBatchProcessor extends AbstractRecordProcesso
@Override
public void processRecords(final List<SourceRecord> records) throws Exception {
LOGGER.debug("Submitting {} records for processing.", records.size());
LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size());
final List<Future<R>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformAndConvertRecord<R>(r, transformations, convertor))));
LOGGER.debug("Getting source records.");
LOGGER.trace("Getting source records.");
final List<R> convertedRecords = new ArrayList<>(recordFutures.size());
for (Future<R> f : recordFutures) {
R record = f.get(); // we need the whole batch, eventually wait forever
@ -1197,7 +1188,7 @@ public void processRecords(final List<SourceRecord> records) throws Exception {
}
}
LOGGER.debug("Calling user handler.");
LOGGER.trace("Calling user handler.");
userHandler.handleBatch(convertedRecords, committer);
}
}
@ -1216,23 +1207,23 @@ private class ParallelSmtConsumerProcessor extends AbstractRecordProcessor {
@Override
public void processRecords(final List<SourceRecord> records) throws Exception {
LOGGER.debug("Submitting {} records for processing.", records.size());
LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size());
final List<Future<SourceRecord>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformRecord(r, transformations))));
LOGGER.debug("Waiting for the batch to finish processing.");
LOGGER.trace("Waiting for the batch to finish processing.");
final List<SourceRecord> transformedRecords = new ArrayList<>(recordFutures.size());
for (Future<SourceRecord> f : recordFutures) {
transformedRecords.add(f.get()); // we need the whole batch, eventually wait forever
}
LOGGER.debug("Calling user consumer.");
LOGGER.trace("Calling user consumer.");
for (int i = 0; i < records.size(); i++) {
consumer.accept(transformedRecords.get(i));
committer.markProcessed(records.get(i));
}
LOGGER.debug("Marking batch as finished.");
LOGGER.trace("Marking batch as finished.");
committer.markBatchFinished();
}
}
@ -1253,23 +1244,23 @@ private class ParallelSmtAndConvertConsumerProcessor extends AbstractRecordProce
@Override
public void processRecords(final List<SourceRecord> records) throws Exception {
LOGGER.debug("Submitting {} records for processing.", records.size());
LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size());
final List<Future<R>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformAndConvertRecord(r, transformations, convertor))));
LOGGER.debug("Waiting for the batch to finish processing.");
LOGGER.trace("Waiting for the batch to finish processing.");
final List<R> convertedRecords = new ArrayList<>(recordFutures.size());
for (Future<R> f : recordFutures) {
convertedRecords.add(f.get()); // we need the whole batch, eventually wait forever
}
LOGGER.debug("Calling user consumer.");
LOGGER.trace("Calling user consumer.");
for (int i = 0; i < records.size(); i++) {
consumer.accept(convertedRecords.get(i));
committer.markProcessed(records.get(i));
}
LOGGER.debug("Marking batch as finished.");
LOGGER.trace("Marking batch as finished.");
committer.markBatchFinished();
}
}
@ -1288,17 +1279,17 @@ private class ParallelSmtAsyncConsumerProcessor extends AbstractRecordProcessor
@Override
public void processRecords(final List<SourceRecord> records) throws Exception {
LOGGER.debug("Submitting {} records for processing.", records.size());
LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size());
final List<Future<Void>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformAndConsumeRecord(r, transformations, consumer))));
LOGGER.debug("Waiting for the batch to finish processing.");
LOGGER.trace("Waiting for the batch to finish processing.");
for (int i = 0; i < records.size(); i++) {
recordFutures.get(i);
committer.markProcessed(records.get(i));
}
LOGGER.debug("Marking batch as finished.");
LOGGER.trace("Marking batch as finished.");
committer.markBatchFinished();
}
}
@ -1319,17 +1310,17 @@ private class ParallelSmtAndConvertAsyncConsumerProcessor extends AbstractRecord
@Override
public void processRecords(final List<SourceRecord> records) throws Exception {
LOGGER.debug("Submitting {} records for processing.", records.size());
LOGGER.debug("Thread {} is submitting {} records for processing.", Thread.currentThread().getName(), records.size());
final List<Future<Void>> recordFutures = new ArrayList<>(records.size());
records.stream().forEachOrdered(r -> recordFutures.add(recordService.submit(new TransformConvertConsumeRecord<>(r, transformations, convertor, consumer))));
LOGGER.debug("Waiting for the batch to finish processing.");
LOGGER.trace("Waiting for the batch to finish processing.");
for (int i = 0; i < records.size(); i++) {
recordFutures.get(i);
committer.markProcessed(records.get(i));
}
LOGGER.debug("Marking batch as finished.");
LOGGER.trace("Marking batch as finished.");
committer.markBatchFinished();
}
}