DBZ-5612 Support for formatted messages

This commit is contained in:
Jiri Pechanec 2022-10-04 13:17:23 +02:00
parent 58ab632ea3
commit 99dff302f1
6 changed files with 15 additions and 10 deletions

View File

@ -136,7 +136,7 @@ private void monitorSignals() {
return;
}
catch (final Exception e) {
Throwables.logErrorAndTraceRecord(LOGGER, "Skipped signal due to an error", e, record);
Throwables.logErrorAndTraceRecord(LOGGER, record, "Skipped signal due to an error", e);
}
}
}

View File

@ -205,7 +205,7 @@ public void disableBuffering() {
}
protected void doEnqueue(T record) throws InterruptedException {
if (LOGGER.isDebugEnabled()) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Enqueuing source record '{}'", record);
}

View File

@ -27,8 +27,13 @@ public static Throwable getRootCause(Throwable throwable) {
}
}
public static void logErrorAndTraceRecord(Logger logger, String message, Exception e, Object record) {
public static void logErrorAndTraceRecord(Logger logger, Object record, String message, Throwable e) {
logger.error(message, e);
LOGGER.trace("Source of error is {}", record);
LOGGER.trace("Source of error is record '{}'", record);
}
public static void logErrorAndTraceRecord(Logger logger, Object record, String message, Object... arguments) {
logger.error(message, arguments);
LOGGER.trace("Source of error is record '{}'", record);
}
}

View File

@ -14,7 +14,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import io.debezium.util.Throwables;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,6 +31,7 @@
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.Collect;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Throwables;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
@ -151,7 +151,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
line = writer.write(record.document());
}
catch (IOException e) {
Throwables.logErrorAndTraceRecord(LOGGER, "Failed to convert record to string", e, record);
Throwables.logErrorAndTraceRecord(LOGGER, record, "Failed to convert record to string", e);
throw new SchemaHistoryException("Unable to write database schema history record");
}

View File

@ -125,7 +125,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
historyWriter.newLine();
}
catch (IOException e) {
Throwables.logErrorAndTraceRecord(logger, "Failed to add record to history at " + path, e, record);
Throwables.logErrorAndTraceRecord(logger, record, "Failed to add record to history at {}", path, e);
return;
}
}
@ -134,7 +134,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
}
}
catch (IOException e) {
Throwables.logErrorAndTraceRecord(logger, "Failed to convert record to string", e, record);
Throwables.logErrorAndTraceRecord(logger, record, "Failed to convert record to string", e);
}
});
}

View File

@ -345,10 +345,10 @@ protected void recoverRecords(Consumer<HistoryRecord> records) {
}
}
catch (final IOException e) {
Throwables.logErrorAndTraceRecord(LOGGER, "Error while deserializing history record", e, record);
Throwables.logErrorAndTraceRecord(LOGGER, record, "Error while deserializing history record", e);
}
catch (final Exception e) {
Throwables.logErrorAndTraceRecord(LOGGER, "Unexpected exception while processing record", e, record);
Throwables.logErrorAndTraceRecord(LOGGER, record, "Unexpected exception while processing record", e);
throw e;
}
}