diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java index 939e5154f..70533617d 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java @@ -7,8 +7,9 @@ import java.sql.SQLException; import java.util.Map; -import java.util.function.Supplier; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.connect.errors.ConnectException; import org.fest.assertions.Assertions; import org.fest.assertions.MapAssert; import org.junit.Rule; @@ -16,17 +17,17 @@ import org.junit.rules.TestRule; import io.debezium.config.Configuration; +import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule; +import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs; import io.debezium.jdbc.JdbcConnection; -import io.debezium.junit.ConditionalFail; -import io.debezium.junit.ShouldFailWhen; import io.debezium.util.Testing; -@ShouldFailWhen(ReadOnlyIncrementalSnapshotIT.IsGtidModeOff.class) +@SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.OFF, reason = "Read only connection requires GTID_MODE to be ON") public class ReadOnlyIncrementalSnapshotIT extends IncrementalSnapshotIT { public static final String EXCLUDED_TABLE = "b"; @Rule - public TestRule conditionalFail = new ConditionalFail(); + public TestRule skipTest = new SkipTestDependingOnGtidModeRule(); protected Configuration.Builder config() { return super.config() @@ -69,22 +70,18 @@ public void filteredEvents() throws Exception { } } - public static class IsGtidModeOff implements Supplier { - - public Boolean get() { - try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase("emptydb")) { - return db.queryAndMap( - "SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", - rs -> { - if (rs.next()) { - return "OFF".equalsIgnoreCase(rs.getString(2)); - } - throw new IllegalStateException("Cannot obtain GTID status"); - }); - } - catch (SQLException e) { - throw new IllegalStateException("Cannot obtain GTID status", e); - } + @Test(expected = ConnectException.class) + @SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.ON, reason = "Read only connection requires GTID_MODE to be ON") + public void shouldFailIfGtidModeIsOff() throws Exception { + Testing.Print.enable(); + populateTable(); + AtomicReference exception = new AtomicReference<>(); + startConnector((success, message, error) -> exception.set(error)); + waitForConnectorShutdown("mysql", DATABASE.getServerName()); + stopConnector(); + final Throwable e = exception.get(); + if (e != null) { + throw (RuntimeException) e; } } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/junit/SkipTestDependingOnGtidModeRule.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/junit/SkipTestDependingOnGtidModeRule.java new file mode 100644 index 000000000..75b58efc0 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/junit/SkipTestDependingOnGtidModeRule.java @@ -0,0 +1,47 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql.junit; + +import java.sql.SQLException; + +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import io.debezium.connector.mysql.MySqlTestConnection; +import io.debezium.junit.AnnotationBasedTestRule; + +/** + * JUnit rule that skips a test based on the {@link SkipWhenGtidModeIs} annotation on either a test method or a test class. + */ +public class SkipTestDependingOnGtidModeRule extends AnnotationBasedTestRule { + private static final SkipWhenGtidModeIs.GtidMode gtidMode = getGtidMode(); + + @Override + public Statement apply(Statement base, Description description) { + SkipWhenGtidModeIs skipGtidMode = hasAnnotation(description, SkipWhenGtidModeIs.class); + if (skipGtidMode != null && skipGtidMode.value().equals(gtidMode)) { + String reasonForSkipping = "GTID_MODE is " + skipGtidMode.value() + System.lineSeparator() + skipGtidMode.reason(); + return emptyStatement(reasonForSkipping, description); + } + return base; + } + + public static SkipWhenGtidModeIs.GtidMode getGtidMode() { + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase("emptydb")) { + return db.queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", + rs -> { + if (rs.next()) { + return SkipWhenGtidModeIs.GtidMode.valueOf(rs.getString(2)); + } + throw new IllegalStateException("Cannot obtain GTID status"); + }); + } + catch (SQLException e) { + throw new IllegalStateException("Cannot obtain GTID status", e); + } + } +} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/junit/SkipWhenGtidModeIs.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/junit/SkipWhenGtidModeIs.java new file mode 100644 index 000000000..42517903e --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/junit/SkipWhenGtidModeIs.java @@ -0,0 +1,32 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql.junit; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation used together with the {@link SkipTestDependingOnGtidModeRule} JUnit rule, that allows + * tests to be skipped based on the GTID mode set in the database used for testing + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.METHOD, ElementType.TYPE }) +public @interface SkipWhenGtidModeIs { + + SkipWhenGtidModeIs.GtidMode value(); + + /** + * Returns the reason why the test should be skipped. + */ + String reason() default ""; + + enum GtidMode { + ON, + OFF + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index 7915512a2..0b9d4615c 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -27,6 +27,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.engine.DebeziumEngine; import io.debezium.jdbc.JdbcConnection; import io.debezium.util.Testing; @@ -126,9 +127,17 @@ protected void sendAdHocSnapshotSignal() throws SQLException { } } + protected void startConnector(DebeziumEngine.CompletionCallback callback) { + startConnector(Function.identity(), callback); + } + protected void startConnector(Function custConfig) { + startConnector(custConfig, loggingCompletion()); + } + + protected void startConnector(Function custConfig, DebeziumEngine.CompletionCallback callback) { final Configuration config = custConfig.apply(config()).build(); - start(connectorClass(), config); + start(connectorClass(), config, callback); waitForConnectorToStart(); waitForAvailableRecords(1, TimeUnit.SECONDS); @@ -137,7 +146,7 @@ protected void startConnector(Function