DBZ-7838 Support for MySQL 8.4

This commit is contained in:
Jiri Pechanec 2024-08-13 14:29:09 +02:00
parent 0056310130
commit 8ca8f451f8
12 changed files with 96 additions and 15 deletions

View File

@ -36,7 +36,6 @@ public abstract class BinlogReadOnlyIncrementalSnapshotChangeEventSource<P exten
extends AbstractIncrementalSnapshotChangeEventSource<P, TableId> {
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReadOnlyIncrementalSnapshotChangeEventSource.class);
private static final String SHOW_MASTER_STMT = "SHOW MASTER STATUS";
private final GtidSetFactory gtidSetFactory;

View File

@ -47,6 +47,7 @@ public abstract class BinlogConnectorConnection extends JdbcConnection {
private static final String SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET = "SHOW VARIABLES WHERE Variable_name IN ('character_set_server','collation_server')";
private static final String SQL_SHOW_SESSION_VARIABLE_SSL_VERSION = "SHOW SESSION STATUS LIKE 'Ssl_version'";
private static final String QUOTED_CHARACTER = "`";
public static final String MASTER_STATUS_STATEMENT = "SHOW MASTER STATUS";
private final ConnectionConfiguration connectionConfig;
private final BinlogFieldReader fieldReader;
@ -422,6 +423,10 @@ public boolean validateLogPosition(Partition partition, OffsetContext offset, Co
return isBinlogPositionAvailable((BinlogConnectorConfig) config, gtidSet, binlogFilename);
}
public String binaryLogStatusStatement() {
return MASTER_STATUS_STATEMENT;
}
/**
* Determine whether the server has enabled GTID support.
*

View File

@ -580,7 +580,8 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl
BinlogPosition positionAfterUpdate = new BinlogPosition();
try (BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.query("SHOW MASTER STATUS", positionBeforeInserts::readFromDatabase);
var statusStmt = db.binaryLogStatusStatement();
connection.query(statusStmt, positionBeforeInserts::readFromDatabase);
connection.execute("INSERT INTO products(id,name,description,weight,volume,alias) VALUES "
+ "(3001,'ashley','super robot',34.56,0.00,'ashbot'), "
+ "(3002,'arthur','motorcycle',87.65,0.00,'arcycle'), "
@ -590,7 +591,7 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl
connection.print(rs);
}
});
connection.query("SHOW MASTER STATUS", positionAfterInserts::readFromDatabase);
connection.query(statusStmt, positionAfterInserts::readFromDatabase);
// Change something else that is unrelated ...
connection.execute("UPDATE products_on_hand SET quantity=40 WHERE product_id=109");
connection.query("SELECT * FROM products_on_hand", rs -> {
@ -598,7 +599,7 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl
connection.print(rs);
}
});
connection.query("SHOW MASTER STATUS", positionAfterUpdate::readFromDatabase);
connection.query(statusStmt, positionAfterUpdate::readFromDatabase);
}
}

View File

@ -12,6 +12,7 @@
import java.sql.SQLException;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
@ -55,6 +56,18 @@ public String getMySqlVersionString() {
return versionString;
}
public String binaryLogStatusStatement() {
final var binaryLogStatus = "SHOW BINARY LOG STATUS";
try {
query(binaryLogStatus, rs -> {
});
return binaryLogStatus;
}
catch (SQLException e) {
return BinlogConnectorConnection.MASTER_STATUS_STATEMENT;
}
}
public abstract boolean isGtidEnabled();
/**

View File

@ -61,7 +61,7 @@ protected void setOffsetContextBinlogPositionAndGtidDetailsForSnapshot(MariaDbOf
SnapshotterService snapshotterService)
throws Exception {
LOGGER.info("Read binlog position of MariaDB primary server");
final String showMasterStmt = "SHOW MASTER STATUS";
final String showMasterStmt = connection.binaryLogStatusStatement();
connection.query(showMasterStmt, rs -> {
if (rs.next()) {
final String binlogFilename = rs.getString(1);

View File

@ -11,8 +11,8 @@
import io.debezium.DebeziumException;
import io.debezium.connector.binlog.BinlogReadOnlyIncrementalSnapshotChangeEventSource;
import io.debezium.connector.binlog.gtid.GtidSet;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.mysql.gtid.MySqlGtidSet;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
@ -71,10 +71,10 @@
*/
public class MySqlReadOnlyIncrementalSnapshotChangeEventSource extends BinlogReadOnlyIncrementalSnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> {
private static final String SHOW_MASTER_STMT = "SHOW MASTER STATUS";
private final BinlogConnectorConnection binlogConnectorConnection;
public MySqlReadOnlyIncrementalSnapshotChangeEventSource(MySqlConnectorConfig config,
JdbcConnection jdbcConnection,
BinlogConnectorConnection jdbcConnection,
EventDispatcher<MySqlPartition, TableId> dispatcher,
DatabaseSchema<?> databaseSchema,
Clock clock,
@ -82,12 +82,13 @@ public MySqlReadOnlyIncrementalSnapshotChangeEventSource(MySqlConnectorConfig co
DataChangeEventListener<MySqlPartition> dataChangeEventListener,
NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService);
binlogConnectorConnection = jdbcConnection;
}
@Override
protected void getExecutedGtidSet(Consumer<GtidSet> watermark) {
try {
jdbcConnection.query(SHOW_MASTER_STMT, rs -> {
jdbcConnection.query(binlogConnectorConnection.binaryLogStatusStatement(), rs -> {
if (rs.next()) {
if (rs.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...

View File

@ -56,7 +56,7 @@ protected void setOffsetContextBinlogPositionAndGtidDetailsForSnapshot(MySqlOffs
SnapshotterService snapshotterService)
throws Exception {
LOGGER.info("Read binlog position of MySQL primary server");
final String showMasterStmt = "SHOW MASTER STATUS";
final String showMasterStmt = connection.binaryLogStatusStatement();
connection.query(showMasterStmt, rs -> {
if (rs.next()) {
final String binlogFilename = rs.getString(1);

View File

@ -24,10 +24,30 @@
*/
public class MySqlConnection extends BinlogConnectorConnection {
public static final String BINARY_LOG_STATUS_STATEMENT = "SHOW BINARY LOG STATUS";
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnection.class);
private final String binaryLogStatusStatement;
public MySqlConnection(MySqlConnectionConfiguration connectionConfig, BinlogFieldReader fieldReader) {
super(connectionConfig, fieldReader);
try {
query(BINARY_LOG_STATUS_STATEMENT, rs -> {
});
}
catch (SQLException e) {
LOGGER.info("Using '{}' to get binary log status", MASTER_STATUS_STATEMENT);
binaryLogStatusStatement = MASTER_STATUS_STATEMENT;
return;
}
LOGGER.info("Using '{}' to get binary log status", BINARY_LOG_STATUS_STATEMENT);
binaryLogStatusStatement = BINARY_LOG_STATUS_STATEMENT;
}
public String binaryLogStatusStatement() {
return binaryLogStatusStatement;
}
@Override
@ -48,7 +68,7 @@ public boolean isGtidModeEnabled() {
@Override
public GtidSet knownGtidSet() {
try {
return queryAndMap("SHOW MASTER STATUS", rs -> {
return queryAndMap(binaryLogStatusStatement(), rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
return new MySqlGtidSet(rs.getString(5)); // GTID set, may be null, blank, or contain a GTID set
}

View File

@ -20,7 +20,7 @@
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
host-cache-size = 0
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
@ -106,3 +106,18 @@ enforce_gtid_consistency = on
default_authentication_plugin = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 259200
# --------------------------------------------------------------------------------------------
# This section specifies 8.4 specific configurations
# --------------------------------------------------------------------------------------------
[mysqld-8.4]
# ----------------------------------------------
# Enable GTIDs on this primary server
# ----------------------------------------------
gtid_mode = on
enforce_gtid_consistency = on
authentication_policy = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 259200

View File

@ -20,7 +20,7 @@
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
host-cache-size = 0
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
@ -86,3 +86,14 @@ enforce_gtid_consistency = on
default_authentication_plugin = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 259200
# --------------------------------------------------------------------------------------------
# This section specifies 8.4 specific configurations
# --------------------------------------------------------------------------------------------
[mysqld-8.4]
gtid_mode = on
enforce_gtid_consistency = on
authentication_policy = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 259200

View File

@ -20,7 +20,7 @@
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
host-cache-size = 0
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
@ -74,3 +74,11 @@ binlog_expire_logs_seconds = 86400
default_authentication_plugin = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 86400
# --------------------------------------------------------------------------------------------
# This section specifies 8.4 specific configurations
# --------------------------------------------------------------------------------------------
[mysqld-8.4]
authentication_policy = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 86400

View File

@ -20,7 +20,7 @@
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
host-cache-size = 0
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
@ -69,3 +69,11 @@ binlog_expire_logs_seconds = 86400
default_authentication_plugin = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 86400
# --------------------------------------------------------------------------------------------
# This section specifies 8.4 specific configurations
# --------------------------------------------------------------------------------------------
[mysqld-8.4]
authentication_policy = caching_sha2_password
caching_sha2_password_auto_generate_rsa_keys = on
binlog_expire_logs_seconds = 86400