DBZ-1911 - improve error messaging for binlog configuration validation

This commit is contained in:
Nayana Hettiarachchi 2020-03-29 17:20:37 +07:00 committed by Gunnar Morling
parent 0b7967a456
commit 2c6e33ac76

View File

@ -94,7 +94,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
throw new ConnectException(msg);
}
logger.info("The db-history topic is missing but we are in {} snapshot mode. " +
"Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.",
"Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.",
SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
else {
@ -178,7 +178,9 @@ else if (!earliestBinlogFilename.endsWith("00001")) {
}
// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();
final boolean binlogFormatRow = isBinlogFormatRow();
final boolean binlogRowImageFull = isBinlogRowImageFull();
final boolean rowBinlogEnabled = binlogFormatRow && binlogRowImageFull;
ChainedReader.Builder chainedReaderBuilder = new ChainedReader.Builder();
@ -205,9 +207,16 @@ else if (!earliestBinlogFilename.endsWith("00001")) {
}
else {
if (!rowBinlogEnabled) {
throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
if (!binlogFormatRow) {
throw new ConnectException("The MySQL server is not configured to use a ROW binlog_format, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "binlog_format=ROW and restart the connector.");
}
else {
throw new ConnectException("The MySQL server is not configured to use a FULL binlog_row_image, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "binlog_row_image=FULL and restart the connector.");
}
}
BinlogReader binlogReader = new BinlogReader("binlog", taskContext, null);
chainedReaderBuilder.addReader(binlogReader);
@ -591,25 +600,11 @@ protected String earliestBinlogFilename() {
}
/**
* Determine whether the MySQL server has the row-level binlog enabled.
* Determine whether the MySQL server has the binlog_row_image set to 'FULL'.
*
* @return {@code true} if the server's {@code binlog_format} is set to {@code ROW}, or {@code false} otherwise
* @return {@code true} if the server's {@code binlog_row_image} is set to {@code FULL}, or {@code false} otherwise
*/
protected boolean isRowBinlogEnabled() {
AtomicReference<String> mode = new AtomicReference<String>("");
try {
connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_format'", rs -> {
if (rs.next()) {
mode.set(rs.getString(2));
}
});
}
catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG mode: ", e);
}
logger.debug("binlog_format={}", mode.get());
protected boolean isBinlogRowImageFull() {
AtomicReference<String> rowImage = new AtomicReference<String>("");
try {
connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'", rs -> {
@ -619,10 +614,34 @@ protected boolean isRowBinlogEnabled() {
});
}
catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG row image mode: ", e);
throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG_ROW_IMAGE mode: ", e);
}
logger.debug("binlog_row_image={}", rowImage.get());
return "ROW".equalsIgnoreCase(mode.get()) && "FULL".equalsIgnoreCase(rowImage.get());
return "FULL".equalsIgnoreCase(rowImage.get());
}
/**
* Determine whether the MySQL server has the row-level binlog enabled.
*
* @return {@code true} if the server's {@code binlog_format} is set to {@code ROW}, or {@code false} otherwise
*/
protected boolean isBinlogFormatRow() {
AtomicReference<String> mode = new AtomicReference<String>("");
try {
connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_format'", rs -> {
if (rs.next()) {
mode.set(rs.getString(2));
}
});
}
catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: ", e);
}
logger.debug("binlog_format={}", mode.get());
return "ROW".equalsIgnoreCase(mode.get());
}
}