DBZ-3577 Skip read-only test when gtid mode is off

This commit is contained in:
Kate 2021-07-28 17:06:23 -04:00 committed by Jiri Pechanec
parent 6a30ad3811
commit c6e530cc27
4 changed files with 108 additions and 23 deletions

View File

@ -7,8 +7,9 @@
import java.sql.SQLException;
import java.util.Map;
import java.util.function.Supplier;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.Rule;
@ -16,17 +17,17 @@
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule;
import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.ShouldFailWhen;
import io.debezium.util.Testing;
@ShouldFailWhen(ReadOnlyIncrementalSnapshotIT.IsGtidModeOff.class)
@SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.OFF, reason = "Read only connection requires GTID_MODE to be ON")
public class ReadOnlyIncrementalSnapshotIT extends IncrementalSnapshotIT {
public static final String EXCLUDED_TABLE = "b";
@Rule
public TestRule conditionalFail = new ConditionalFail();
public TestRule skipTest = new SkipTestDependingOnGtidModeRule();
protected Configuration.Builder config() {
return super.config()
@ -69,22 +70,18 @@ public void filteredEvents() throws Exception {
}
}
public static class IsGtidModeOff implements Supplier<Boolean> {
public Boolean get() {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase("emptydb")) {
return db.queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'",
rs -> {
if (rs.next()) {
return "OFF".equalsIgnoreCase(rs.getString(2));
}
throw new IllegalStateException("Cannot obtain GTID status");
});
}
catch (SQLException e) {
throw new IllegalStateException("Cannot obtain GTID status", e);
}
@Test(expected = ConnectException.class)
@SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.ON, reason = "Read only connection requires GTID_MODE to be ON")
public void shouldFailIfGtidModeIsOff() throws Exception {
Testing.Print.enable();
populateTable();
AtomicReference<Throwable> exception = new AtomicReference<>();
startConnector((success, message, error) -> exception.set(error));
waitForConnectorShutdown("mysql", DATABASE.getServerName());
stopConnector();
final Throwable e = exception.get();
if (e != null) {
throw (RuntimeException) e;
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.junit;
import java.sql.SQLException;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.junit.AnnotationBasedTestRule;
/**
* JUnit rule that skips a test based on the {@link SkipWhenGtidModeIs} annotation on either a test method or a test class.
*/
public class SkipTestDependingOnGtidModeRule extends AnnotationBasedTestRule {
private static final SkipWhenGtidModeIs.GtidMode gtidMode = getGtidMode();
@Override
public Statement apply(Statement base, Description description) {
SkipWhenGtidModeIs skipGtidMode = hasAnnotation(description, SkipWhenGtidModeIs.class);
if (skipGtidMode != null && skipGtidMode.value().equals(gtidMode)) {
String reasonForSkipping = "GTID_MODE is " + skipGtidMode.value() + System.lineSeparator() + skipGtidMode.reason();
return emptyStatement(reasonForSkipping, description);
}
return base;
}
public static SkipWhenGtidModeIs.GtidMode getGtidMode() {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase("emptydb")) {
return db.queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'",
rs -> {
if (rs.next()) {
return SkipWhenGtidModeIs.GtidMode.valueOf(rs.getString(2));
}
throw new IllegalStateException("Cannot obtain GTID status");
});
}
catch (SQLException e) {
throw new IllegalStateException("Cannot obtain GTID status", e);
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation used together with the {@link SkipTestDependingOnGtidModeRule} JUnit rule, that allows
* tests to be skipped based on the GTID mode set in the database used for testing
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.TYPE })
public @interface SkipWhenGtidModeIs {
SkipWhenGtidModeIs.GtidMode value();
/**
* Returns the reason why the test should be skipped.
*/
String reason() default "";
enum GtidMode {
ON,
OFF
}
}

View File

@ -27,6 +27,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;
@ -126,9 +127,17 @@ protected void sendAdHocSnapshotSignal() throws SQLException {
}
}
protected void startConnector(DebeziumEngine.CompletionCallback callback) {
startConnector(Function.identity(), callback);
}
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
startConnector(custConfig, loggingCompletion());
}
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig, DebeziumEngine.CompletionCallback callback) {
final Configuration config = custConfig.apply(config()).build();
start(connectorClass(), config);
start(connectorClass(), config, callback);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
@ -137,7 +146,7 @@ protected void startConnector(Function<Configuration.Builder, Configuration.Buil
}
protected void startConnector() {
startConnector(Function.identity());
startConnector(Function.identity(), loggingCompletion());
}
protected void waitForConnectorToStart() {