diff --git a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotChangeEventSource.java index e4de564ba..f64fcde71 100644 --- a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogReadOnlyIncrementalSnapshotChangeEventSource.java @@ -36,7 +36,6 @@ public abstract class BinlogReadOnlyIncrementalSnapshotChangeEventSource

{ private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReadOnlyIncrementalSnapshotChangeEventSource.class); - private static final String SHOW_MASTER_STMT = "SHOW MASTER STATUS"; private final GtidSetFactory gtidSetFactory; diff --git a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogConnectorConnection.java b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogConnectorConnection.java index f8a6d2856..237f8e837 100644 --- a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogConnectorConnection.java +++ b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogConnectorConnection.java @@ -47,6 +47,7 @@ public abstract class BinlogConnectorConnection extends JdbcConnection { private static final String SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET = "SHOW VARIABLES WHERE Variable_name IN ('character_set_server','collation_server')"; private static final String SQL_SHOW_SESSION_VARIABLE_SSL_VERSION = "SHOW SESSION STATUS LIKE 'Ssl_version'"; private static final String QUOTED_CHARACTER = "`"; + public static final String MASTER_STATUS_STATEMENT = "SHOW MASTER STATUS"; private final ConnectionConfiguration connectionConfig; private final BinlogFieldReader fieldReader; @@ -422,6 +423,10 @@ public boolean validateLogPosition(Partition partition, OffsetContext offset, Co return isBinlogPositionAvailable((BinlogConnectorConfig) config, gtidSet, binlogFilename); } + public String binaryLogStatusStatement() { + return MASTER_STATUS_STATEMENT; + } + /** * Determine whether the server has enabled GTID support. * diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java index f385a8948..3669dc002 100644 --- a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java @@ -580,7 +580,8 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl BinlogPosition positionAfterUpdate = new BinlogPosition(); try (BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName());) { try (JdbcConnection connection = db.connect()) { - connection.query("SHOW MASTER STATUS", positionBeforeInserts::readFromDatabase); + var statusStmt = db.binaryLogStatusStatement(); + connection.query(statusStmt, positionBeforeInserts::readFromDatabase); connection.execute("INSERT INTO products(id,name,description,weight,volume,alias) VALUES " + "(3001,'ashley','super robot',34.56,0.00,'ashbot'), " + "(3002,'arthur','motorcycle',87.65,0.00,'arcycle'), " @@ -590,7 +591,7 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl connection.print(rs); } }); - connection.query("SHOW MASTER STATUS", positionAfterInserts::readFromDatabase); + connection.query(statusStmt, positionAfterInserts::readFromDatabase); // Change something else that is unrelated ... connection.execute("UPDATE products_on_hand SET quantity=40 WHERE product_id=109"); connection.query("SELECT * FROM products_on_hand", rs -> { @@ -598,7 +599,7 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl connection.print(rs); } }); - connection.query("SHOW MASTER STATUS", positionAfterUpdate::readFromDatabase); + connection.query(statusStmt, positionAfterUpdate::readFromDatabase); } } diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogTestConnection.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogTestConnection.java index e05ff7820..4e1cf4b31 100644 --- a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogTestConnection.java +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogTestConnection.java @@ -12,6 +12,7 @@ import java.sql.SQLException; import io.debezium.config.Configuration; +import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; @@ -55,6 +56,18 @@ public String getMySqlVersionString() { return versionString; } + public String binaryLogStatusStatement() { + final var binaryLogStatus = "SHOW BINARY LOG STATUS"; + try { + query(binaryLogStatus, rs -> { + }); + return binaryLogStatus; + } + catch (SQLException e) { + return BinlogConnectorConnection.MASTER_STATUS_STATEMENT; + } + } + public abstract boolean isGtidEnabled(); /** diff --git a/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbSnapshotChangeEventSource.java b/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbSnapshotChangeEventSource.java index 9f315444d..be91be39a 100644 --- a/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbSnapshotChangeEventSource.java +++ b/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbSnapshotChangeEventSource.java @@ -61,7 +61,7 @@ protected void setOffsetContextBinlogPositionAndGtidDetailsForSnapshot(MariaDbOf SnapshotterService snapshotterService) throws Exception { LOGGER.info("Read binlog position of MariaDB primary server"); - final String showMasterStmt = "SHOW MASTER STATUS"; + final String showMasterStmt = connection.binaryLogStatusStatement(); connection.query(showMasterStmt, rs -> { if (rs.next()) { final String binlogFilename = rs.getString(1); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java index 59f32f71e..c414a1d94 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java @@ -11,8 +11,8 @@ import io.debezium.DebeziumException; import io.debezium.connector.binlog.BinlogReadOnlyIncrementalSnapshotChangeEventSource; import io.debezium.connector.binlog.gtid.GtidSet; +import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection; import io.debezium.connector.mysql.gtid.MySqlGtidSet; -import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; import io.debezium.pipeline.source.spi.DataChangeEventListener; @@ -71,10 +71,10 @@ */ public class MySqlReadOnlyIncrementalSnapshotChangeEventSource extends BinlogReadOnlyIncrementalSnapshotChangeEventSource { - private static final String SHOW_MASTER_STMT = "SHOW MASTER STATUS"; + private final BinlogConnectorConnection binlogConnectorConnection; public MySqlReadOnlyIncrementalSnapshotChangeEventSource(MySqlConnectorConfig config, - JdbcConnection jdbcConnection, + BinlogConnectorConnection jdbcConnection, EventDispatcher dispatcher, DatabaseSchema databaseSchema, Clock clock, @@ -82,12 +82,13 @@ public MySqlReadOnlyIncrementalSnapshotChangeEventSource(MySqlConnectorConfig co DataChangeEventListener dataChangeEventListener, NotificationService notificationService) { super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener, notificationService); + binlogConnectorConnection = jdbcConnection; } @Override protected void getExecutedGtidSet(Consumer watermark) { try { - jdbcConnection.query(SHOW_MASTER_STMT, rs -> { + jdbcConnection.query(binlogConnectorConnection.binaryLogStatusStatement(), rs -> { if (rs.next()) { if (rs.getMetaData().getColumnCount() > 4) { // This column exists only in MySQL 5.6.5 or later ... diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index ad8a7ac14..39f00440d 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -56,7 +56,7 @@ protected void setOffsetContextBinlogPositionAndGtidDetailsForSnapshot(MySqlOffs SnapshotterService snapshotterService) throws Exception { LOGGER.info("Read binlog position of MySQL primary server"); - final String showMasterStmt = "SHOW MASTER STATUS"; + final String showMasterStmt = connection.binaryLogStatusStatement(); connection.query(showMasterStmt, rs -> { if (rs.next()) { final String binlogFilename = rs.getString(1); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlConnection.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlConnection.java index 04912c995..c1a991a59 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlConnection.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlConnection.java @@ -24,10 +24,30 @@ */ public class MySqlConnection extends BinlogConnectorConnection { + public static final String BINARY_LOG_STATUS_STATEMENT = "SHOW BINARY LOG STATUS"; + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnection.class); + private final String binaryLogStatusStatement; + public MySqlConnection(MySqlConnectionConfiguration connectionConfig, BinlogFieldReader fieldReader) { super(connectionConfig, fieldReader); + + try { + query(BINARY_LOG_STATUS_STATEMENT, rs -> { + }); + } + catch (SQLException e) { + LOGGER.info("Using '{}' to get binary log status", MASTER_STATUS_STATEMENT); + binaryLogStatusStatement = MASTER_STATUS_STATEMENT; + return; + } + LOGGER.info("Using '{}' to get binary log status", BINARY_LOG_STATUS_STATEMENT); + binaryLogStatusStatement = BINARY_LOG_STATUS_STATEMENT; + } + + public String binaryLogStatusStatement() { + return binaryLogStatusStatement; } @Override @@ -48,7 +68,7 @@ public boolean isGtidModeEnabled() { @Override public GtidSet knownGtidSet() { try { - return queryAndMap("SHOW MASTER STATUS", rs -> { + return queryAndMap(binaryLogStatusStatement(), rs -> { if (rs.next() && rs.getMetaData().getColumnCount() > 4) { return new MySqlGtidSet(rs.getString(5)); // GTID set, may be null, blank, or contain a GTID set } diff --git a/debezium-connector-mysql/src/test/docker/server-gtids/my.cnf b/debezium-connector-mysql/src/test/docker/server-gtids/my.cnf index fd1cf511d..4b3fba437 100644 --- a/debezium-connector-mysql/src/test/docker/server-gtids/my.cnf +++ b/debezium-connector-mysql/src/test/docker/server-gtids/my.cnf @@ -20,7 +20,7 @@ # join_buffer_size = 128M # sort_buffer_size = 2M # read_rnd_buffer_size = 2M -skip-host-cache +host-cache-size = 0 skip-name-resolve #datadir=/var/lib/mysql #socket=/var/lib/mysql/mysql.sock @@ -106,3 +106,18 @@ enforce_gtid_consistency = on default_authentication_plugin = caching_sha2_password caching_sha2_password_auto_generate_rsa_keys = on binlog_expire_logs_seconds = 259200 + +# -------------------------------------------------------------------------------------------- +# This section specifies 8.4 specific configurations +# -------------------------------------------------------------------------------------------- +[mysqld-8.4] + +# ---------------------------------------------- +# Enable GTIDs on this primary server +# ---------------------------------------------- +gtid_mode = on +enforce_gtid_consistency = on + +authentication_policy = caching_sha2_password +caching_sha2_password_auto_generate_rsa_keys = on +binlog_expire_logs_seconds = 259200 diff --git a/debezium-connector-mysql/src/test/docker/server-replica/my.cnf b/debezium-connector-mysql/src/test/docker/server-replica/my.cnf index 6ea6ff586..5f8c458f8 100644 --- a/debezium-connector-mysql/src/test/docker/server-replica/my.cnf +++ b/debezium-connector-mysql/src/test/docker/server-replica/my.cnf @@ -20,7 +20,7 @@ # join_buffer_size = 128M # sort_buffer_size = 2M # read_rnd_buffer_size = 2M -skip-host-cache +host-cache-size = 0 skip-name-resolve #datadir=/var/lib/mysql #socket=/var/lib/mysql/mysql.sock @@ -86,3 +86,14 @@ enforce_gtid_consistency = on default_authentication_plugin = caching_sha2_password caching_sha2_password_auto_generate_rsa_keys = on binlog_expire_logs_seconds = 259200 + +# -------------------------------------------------------------------------------------------- +# This section specifies 8.4 specific configurations +# -------------------------------------------------------------------------------------------- +[mysqld-8.4] +gtid_mode = on +enforce_gtid_consistency = on + +authentication_policy = caching_sha2_password +caching_sha2_password_auto_generate_rsa_keys = on +binlog_expire_logs_seconds = 259200 diff --git a/debezium-connector-mysql/src/test/docker/server-ssl/my.cnf b/debezium-connector-mysql/src/test/docker/server-ssl/my.cnf index fd02706b7..4f3ea3894 100644 --- a/debezium-connector-mysql/src/test/docker/server-ssl/my.cnf +++ b/debezium-connector-mysql/src/test/docker/server-ssl/my.cnf @@ -20,7 +20,7 @@ # join_buffer_size = 128M # sort_buffer_size = 2M # read_rnd_buffer_size = 2M -skip-host-cache +host-cache-size = 0 skip-name-resolve #datadir=/var/lib/mysql #socket=/var/lib/mysql/mysql.sock @@ -74,3 +74,11 @@ binlog_expire_logs_seconds = 86400 default_authentication_plugin = caching_sha2_password caching_sha2_password_auto_generate_rsa_keys = on binlog_expire_logs_seconds = 86400 + +# -------------------------------------------------------------------------------------------- +# This section specifies 8.4 specific configurations +# -------------------------------------------------------------------------------------------- +[mysqld-8.4] +authentication_policy = caching_sha2_password +caching_sha2_password_auto_generate_rsa_keys = on +binlog_expire_logs_seconds = 86400 diff --git a/debezium-connector-mysql/src/test/docker/server/my.cnf b/debezium-connector-mysql/src/test/docker/server/my.cnf index cec7d94e6..0c5ca37b9 100644 --- a/debezium-connector-mysql/src/test/docker/server/my.cnf +++ b/debezium-connector-mysql/src/test/docker/server/my.cnf @@ -20,7 +20,7 @@ # join_buffer_size = 128M # sort_buffer_size = 2M # read_rnd_buffer_size = 2M -skip-host-cache +host-cache-size = 0 skip-name-resolve #datadir=/var/lib/mysql #socket=/var/lib/mysql/mysql.sock @@ -69,3 +69,11 @@ binlog_expire_logs_seconds = 86400 default_authentication_plugin = caching_sha2_password caching_sha2_password_auto_generate_rsa_keys = on binlog_expire_logs_seconds = 86400 + +# -------------------------------------------------------------------------------------------- +# This section specifies 8.4 specific configurations +# -------------------------------------------------------------------------------------------- +[mysqld-8.4] +authentication_policy = caching_sha2_password +caching_sha2_password_auto_generate_rsa_keys = on +binlog_expire_logs_seconds = 86400