DBZ-1953 Generalized SkipWhenDatabaseVersion functionality

This commit is contained in:
Chris Cranford 2020-04-23 14:53:46 -04:00 committed by Jiri Pechanec
parent 6fca72c773
commit 230ddaebdc
35 changed files with 436 additions and 358 deletions

View File

@ -5,8 +5,8 @@
*/
package io.debezium.connector.mongodb.transforms;
import static io.debezium.junit.SkipWhenKafkaVersion.EqualityCheck.GREATER_THAN_OR_EQUAL;
import static io.debezium.junit.SkipWhenKafkaVersion.EqualityCheck.LESS_THAN;
import static io.debezium.junit.EqualityCheck.GREATER_THAN_OR_EQUAL;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static io.debezium.junit.SkipWhenKafkaVersion.KafkaVersion.KAFKA_241;
import static org.fest.assertions.Assertions.assertThat;

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.file.Path;
@ -17,24 +18,21 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
* @author Jiri Pechanec, Randall Hauch
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "Use of fractal notation on DATE, TIME, DATETIME, and TIMESTAMP not supported on MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class BinlogReaderBufferIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
@ -45,9 +43,6 @@ public class BinlogReaderBufferIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();
@ -335,7 +330,7 @@ public void shouldProcessRolledBackSavepoint() throws SQLException, InterruptedE
int recordCount;
int customerEventsCount;
int topicCount;
if (isMySql5("emptydb")) {
if (MySQLConnection.isMySQL5()) {
// MySQL 5 contains events when the TX was effectively rolled-back
// INSERT + INSERT + ROLLBACK, SAVEPOINT filtered
recordCount = 3;
@ -356,15 +351,4 @@ public void shouldProcessRolledBackSavepoint() throws SQLException, InterruptedE
Testing.print("*** Done with savepoint TX");
}
}
private static boolean isMySql5(String databaseName) {
switch (MySQLConnection.forTestDatabase(databaseName).getMySqlVersion()) {
case MYSQL_5_5:
case MYSQL_5_6:
case MYSQL_5_7:
return true;
default:
return false;
}
}
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static io.debezium.junit.EqualityCheck.LESS_THAN_OR_EQUAL;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -34,11 +36,7 @@
import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.AbstractReader.AcceptAllPredicate;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersions;
import io.debezium.data.Envelope;
import io.debezium.data.KeyValueStore;
import io.debezium.data.KeyValueStore.Collection;
@ -46,6 +44,8 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
@ -53,7 +53,7 @@
* @author Randall Hauch
*
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "Use of fractal notation on DATE, TIME, DATETIME, and TIMESTAMP not supported on MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class BinlogReaderIT {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-binlog.txt").toAbsolutePath();
@ -69,7 +69,7 @@ public class BinlogReaderIT {
private SchemaChangeHistory schemaChanges;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
public SkipTestRule skipRule = new SkipTestRule();
@Before
public void beforeEach() {
@ -493,10 +493,7 @@ public void shouldFailOnUnknownTlsProtocol() {
@Test
@FixFor("DBZ-1208")
@SkipWhenDatabaseVersions({
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "Uses fractal notation for DATE, TIME, DATETIME, and TIMESTAMP which isn't supported by MySQL 5.5, along with no SSL support"),
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_6, reason = "MySQL 5.6 does not support SSL")
})
@SkipWhenDatabaseVersion(check = LESS_THAN_OR_EQUAL, major = 5, minor = 6, reason = "MySQL 5.6 does not support SSL")
public void shouldAcceptTls12() {
final UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test")
.withDbHistoryPath(DB_HISTORY_PATH);

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -12,16 +14,15 @@
import org.junit.Rule;
import org.junit.Test;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, patch = 5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
public class ConnectionIT implements Testing {
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
public SkipTestRule skipTest = new SkipTestRule();
@Ignore
@Test

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
@ -15,19 +16,18 @@
import org.junit.Rule;
import org.junit.Test;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.Tables;
import io.debezium.util.Testing;
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, patch = 5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
public class MetadataIT implements Testing {
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
public SkipTestRule skipTest = new SkipTestRule();
/**
* Loads the {@link Tables} definition by reading JDBC metadata. Note that some characteristics, such as whether columns

View File

@ -76,6 +76,22 @@ public static MySQLConnection forTestDatabase(String databaseName, String userna
.build());
}
/**
* Obtain whether the database source is MySQL 5.x or not.
*
* @return true if the database version is 5.x; otherwise false.
*/
public static boolean isMySQL5() {
switch (forTestDatabase("mysql").getMySqlVersion()) {
case MYSQL_5_5:
case MYSQL_5_6:
case MYSQL_5_7:
return true;
default:
return false;
}
}
protected static void addDefaults(Configuration.Builder builder) {
builder.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 3306)
@ -96,37 +112,41 @@ public MySQLConnection(Configuration config) {
public MySqlVersion getMySqlVersion() {
if (mySqlVersion == null) {
String versionString;
try {
versionString = connect().queryAndMap("SHOW GLOBAL VARIABLES LIKE 'version'", rs -> {
rs.next();
return rs.getString(2);
});
if (versionString.startsWith("8.")) {
mySqlVersion = MySqlVersion.MYSQL_8;
}
else if (versionString.startsWith("5.5")) {
mySqlVersion = MySqlVersion.MYSQL_5_5;
}
else if (versionString.startsWith("5.6")) {
mySqlVersion = MySqlVersion.MYSQL_5_6;
}
else if (versionString.startsWith("5.7")) {
mySqlVersion = MySqlVersion.MYSQL_5_7;
}
else {
throw new IllegalStateException("Couldn't resolve MySQL Server version");
}
final String versionString = getMySqlVersionString();
if (versionString.startsWith("8.")) {
mySqlVersion = MySqlVersion.MYSQL_8;
}
catch (SQLException e) {
throw new IllegalStateException("Couldn't obtain MySQL Server version", e);
else if (versionString.startsWith("5.5")) {
mySqlVersion = MySqlVersion.MYSQL_5_5;
}
else if (versionString.startsWith("5.6")) {
mySqlVersion = MySqlVersion.MYSQL_5_6;
}
else if (versionString.startsWith("5.7")) {
mySqlVersion = MySqlVersion.MYSQL_5_7;
}
else {
throw new IllegalStateException("Couldn't resolve MySQL Server version");
}
}
return mySqlVersion;
}
public String getMySqlVersionString() {
String versionString;
try {
versionString = connect().queryAndMap("SHOW GLOBAL VARIABLES LIKE 'version'", rs -> {
rs.next();
return rs.getString(2);
});
}
catch (SQLException e) {
throw new IllegalStateException("Couldn't obtain MySQL Server version", e);
}
return versionString;
}
public DatabaseDifferences databaseAsserts() {
if (databaseAsserts == null) {
if (getMySqlVersion() == MySqlVersion.MYSQL_8) {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
@ -30,7 +31,6 @@
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
@ -39,8 +39,6 @@
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotLockingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
@ -48,6 +46,7 @@
import io.debezium.embedded.EmbeddedEngine.CompletionResult;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.history.DatabaseHistory;
@ -58,7 +57,7 @@
/**
* @author Randall Hauch
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "Use of fractal notation for DATE, TIME, DATETIME, and TIMESTAMP is not supported on MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class MySqlConnectorIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
@ -74,9 +73,6 @@ public class MySqlConnectorIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
@ -17,26 +18,19 @@
import org.apache.kafka.connect.data.Struct;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersions;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*/
@SkipWhenDatabaseVersions({
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "JSON data type is not available"),
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_6, reason = "JSON data type is not available")
})
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 7, reason = "JSON data type was not added until MySQL 5.7")
public class MySqlConnectorJsonIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt").toAbsolutePath();
@ -45,9 +39,6 @@ public class MySqlConnectorJsonIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -30,18 +31,15 @@
import org.fest.assertions.Delta;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.time.ZonedTimestamp;
@ -50,7 +48,7 @@
/**
* @author Randall Hauch
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "Use of fractal notation in DATE, TIME, DATETIME, and TIMESTAMP is not supported on MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class MySqlConnectorRegressionIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-regression.txt").toAbsolutePath();
@ -61,9 +59,6 @@ public class MySqlConnectorRegressionIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -7,6 +7,7 @@
import static io.debezium.data.Enum.LOGICAL_NAME;
import static io.debezium.data.Enum.VALUES_FIELD;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.file.Path;
@ -16,22 +17,19 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.data.Envelope.FieldName;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
/**
* @author Chris Cranford
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, patch = 5, reason = "MySQL does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
public class MySqlEnumColumnIT extends AbstractConnectorTest {
private static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
@ -43,9 +41,6 @@ public class MySqlEnumColumnIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.file.Path;
@ -17,15 +18,12 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
import mil.nga.wkb.geom.Point;
@ -35,7 +33,7 @@
/**
* @author Omar Al-Safi
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "Function ST_GeomFromText is not available in MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "Function ST_GeomFromText not added until MySQL 5.6")
public class MySqlGeometryIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt")
@ -45,13 +43,10 @@ public class MySqlGeometryIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();
databaseDifferences = databaseGeoDifferences(MySQLConnection.forTestDatabase("mysql").getMySqlVersion());
databaseDifferences = databaseGeoDifferences(MySQLConnection.isMySQL5());
DATABASE = new UniqueDatabase("geometryit", databaseDifferences.geometryDatabaseName())
.withDbHistoryPath(DB_HISTORY_PATH);
@ -223,8 +218,8 @@ else if (i == 2) {
}
}
private DatabaseGeoDifferences databaseGeoDifferences(MySqlVersion mySqlVersion) {
if (mySqlVersion == MySqlVersion.MYSQL_5_5 || mySqlVersion == MySqlVersion.MYSQL_5_6 || mySqlVersion == MySqlVersion.MYSQL_5_7) {
private DatabaseGeoDifferences databaseGeoDifferences(boolean mySql5) {
if (mySql5) {
return new DatabaseGeoDifferences() {
@Override

View File

@ -5,36 +5,31 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.file.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
/**
* @author Chris Cranford
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "CURRENT_TIMESTAMP cannot be used as a default value for DATETIME columns")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, patch = 5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
public class MySqlTimestampColumnIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-timestamp-column.txt").toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("timestampcolumnit", "timestamp_column_test").withDbHistoryPath(DB_HISTORY_PATH);
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.file.Path;
@ -14,25 +15,18 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersions;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
/**
* @author Chris Cranford
*/
@SkipWhenDatabaseVersions({
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "Generated values are not supported"),
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_6, reason = "Generated values are not supported")
})
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 7, reason = "Generated values were not added until MySQL 5.7")
public class MysqlDefaultGeneratedValueIT extends AbstractConnectorTest {
// 4 meta events (set character_set etc.) and then 15 tables with 3 events each (drop DDL, create DDL, insert)
@ -44,9 +38,6 @@ public class MysqlDefaultGeneratedValueIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import java.math.BigDecimal;
@ -28,18 +29,15 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
@ -49,7 +47,7 @@
/**
* @author luobo
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "DDL uses fractal notation on DATE, TIME, DATETIME which isn't compatible with MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class MysqlDefaultValueIT extends AbstractConnectorTest {
// 4 meta events (set character_set etc.) and then 15 tables with 3 events each (drop DDL, create DDL, insert)
@ -61,9 +59,6 @@ public class MysqlDefaultValueIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
@ -49,13 +50,12 @@
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.ServerException;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, patch = 5, reason = "MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
public class ReadBinLogIT implements Testing {
protected static final Logger LOGGER = LoggerFactory.getLogger(ReadBinLogIT.class);
@ -76,7 +76,7 @@ private static final class AnyValue implements Serializable {
private final UniqueDatabase DATABASE = new UniqueDatabase("readbinlog_it", "readbinlog_test");
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
public SkipTestRule skipTest = new SkipTestRule();
@Before
public void beforeEach() throws TimeoutException, IOException, SQLException, InterruptedException {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
@ -28,14 +29,13 @@
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.data.KeyValueStore;
import io.debezium.data.KeyValueStore.Collection;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Testing;
@ -43,7 +43,7 @@
* @author Randall Hauch
*
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "DDL uses fractal notation on DATE, TIME, DATETIME which isn't compatible with MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class SnapshotReaderIT {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-snapshot.txt").toAbsolutePath();
@ -57,7 +57,7 @@ public class SnapshotReaderIT {
private CountDownLatch completed;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
public SkipTestRule skipRule = new SkipTestRule();
@Before
public void beforeEach() {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.file.Path;
@ -15,17 +16,14 @@
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.mysql.junit.SkipWhenDatabaseVersion;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
/**
@ -34,7 +32,7 @@
*
* @author Jiri Pechanec
*/
@SkipWhenDatabaseVersion(version = MySqlVersion.MYSQL_5_5, reason = "DDL uses fractal notation on DATE, TIME, DATETIME which isn't compatible with MySQL 5.5")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
public class ZZZGtidSetIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
@ -45,9 +43,6 @@ public class ZZZGtidSetIT extends AbstractConnectorTest {
private Configuration config;
@Rule
public SkipTestDependingOnDatabaseVersionRule skipRule = new SkipTestDependingOnDatabaseVersionRule();
@Before
public void beforeEach() {
stopConnector();

View File

@ -0,0 +1,47 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.junit;
import io.debezium.connector.mysql.MySQLConnection;
import io.debezium.junit.DatabaseVersionResolver;
/**
* Implementation of {@link DatabaseVersionResolver} specific for MySQL.
*
* @author Chris Cranford
*/
public class MySqlDatabaseVersionResolver implements DatabaseVersionResolver {
public DatabaseVersion getVersion() {
final String versionString = MySQLConnection.forTestDatabase("mysql").getMySqlVersionString();
final String[] tokens = versionString.split("\\.");
if (tokens.length == 0) {
throw new IllegalStateException("Failed to resolve database version");
}
int major = sanitizeAndParseToken(tokens[0]);
int minor = tokens.length >= 2 ? sanitizeAndParseToken(tokens[1]) : 0;
int patch = tokens.length >= 3 ? sanitizeAndParseToken(tokens[2]) : 0;
return new DatabaseVersion(major, minor, patch);
}
private static int sanitizeAndParseToken(String token) {
// Sometimes the MySQL version string tokens contain non-numeric content, such as '5.5.62-log'.
// In these cases, this method will adequately parse each sub-token such that the '62-log' results in 62.
String[] tokens = token.split("[^0-9]+");
if (tokens.length == 0) {
return 0;
}
try {
return Integer.parseInt(tokens[0]);
}
catch (NumberFormatException e) {
return 0;
}
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.junit;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import io.debezium.connector.mysql.MySQLConnection;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
import io.debezium.junit.AnnotationBasedTestRule;
/**
* JUnit rule that skips a test based on the {@link SkipWhenDatabaseVersion} annotation either on a test method or
* on a test class.
*
* @author Chris Cranford
*/
public class SkipTestDependingOnDatabaseVersionRule extends AnnotationBasedTestRule {
@Override
public Statement apply(Statement base, Description description) {
// First check if multiple version skips are defined
final SkipWhenDatabaseVersions skips = hasAnnotation(description, SkipWhenDatabaseVersions.class);
if (skips != null) {
final MySqlVersion dbVersion = MySQLConnection.forTestDatabase("mysql").getMySqlVersion();
for (SkipWhenDatabaseVersion skip : skips.value()) {
if (skip.version().equals(dbVersion)) {
String reasonForSkipping = "Database version is " + skip.version().name() + ": " + skip.reason();
return emptyStatement(reasonForSkipping, description);
}
}
return base;
}
// Second check if a single version skip is defined
final SkipWhenDatabaseVersion skip = hasAnnotation(description, SkipWhenDatabaseVersion.class);
if (skip != null) {
final MySqlVersion dbVersion = MySQLConnection.forTestDatabase("mysql").getMySqlVersion();
if (skip.version().equals(dbVersion)) {
String reasonForSkipping = "Database version is " + skip.version().name() + ": " + skip.reason();
return emptyStatement(reasonForSkipping, description);
}
}
return base;
}
}

View File

@ -1,34 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
/**
* Marker annotation to control whether a test class or method is to be skipped based on the MySQL database version.
*
* @author Chris Cranford
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@Repeatable(SkipWhenDatabaseVersions.class)
public @interface SkipWhenDatabaseVersion {
/**
* Specifies the version of MySQL for which the test method or class should be skipped.
*/
MySqlVersion version();
/**
* Reason why the test is to be skipped.
*/
String reason() default "";
}

View File

@ -13,3 +13,4 @@ log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
#log4j.logger.io.debezium.connector.mysql.BinlogReader=DEBUG
#log4j.logger.io.debezium.connector.mysql.SnapshotReader=DEBUG
#log4j.logger.io.debezium.relational.history=DEBUG
log4j.logger.org.reflections=ERROR

View File

@ -8,6 +8,7 @@
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
@ -54,10 +55,7 @@
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan;
import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan.PostgresVersion;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.converters.CloudEventsConverterTest;
@ -68,6 +66,7 @@
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Strings;
@ -98,9 +97,6 @@ public class PostgresConnectorIT extends AbstractConnectorTest {
@Rule
public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();
@Rule
public final TestRule skipVersion = new SkipTestDependingOnDatabaseVersionRule();
@BeforeClass
public static void beforeClass() throws SQLException {
TestHelper.dropAllSchemas();
@ -704,7 +700,7 @@ public void shouldResumeSnapshotIfFailingMidstream() throws Exception {
@Test
@FixFor("DBZ-1857")
@SkipWhenDatabaseVersionLessThan(PostgresVersion.POSTGRES_10)
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 10, reason = "Database version less than 10.0")
public void shouldRecoverFromRetriableException() throws Exception {
// Testing.Print.enable();
String setupStmt = SETUP_TABLES_STMT;

View File

@ -6,7 +6,7 @@
package io.debezium.connector.postgresql;
import static io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan.PostgresVersion.POSTGRES_10;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_BLACKLIST;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
@ -24,12 +24,9 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.data.Ltree;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan;
import io.debezium.data.Bits;
import io.debezium.data.Json;
import io.debezium.data.Uuid;
@ -40,6 +37,8 @@
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.doc.FixFor;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
@ -60,7 +59,7 @@
public class PostgresSchemaIT {
@Rule
public final TestRule skip = new SkipTestDependingOnDatabaseVersionRule();
public final SkipTestRule skipTest = new SkipTestRule();
private static final String[] TEST_TABLES = new String[]{ "public.numeric_table", "public.numeric_decimal_table", "public.string_table",
"public.cash_table", "public.bitbin_table", "public.network_address_table",
@ -133,8 +132,7 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception {
}
@Test
// MACADDR8 Postgres type is only supported since Postgres version 10
@SkipWhenDatabaseVersionLessThan(POSTGRES_10)
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 10, reason = "MACADDR8 type is only supported on Postgres 10+")
@FixFor("DBZ-1193")
public void shouldLoadSchemaForMacaddr8PostgresType() throws Exception {
String tableId = "public.macaddr8_table";

View File

@ -8,7 +8,7 @@
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan.PostgresVersion.POSTGRES_10;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -34,12 +34,9 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan;
import io.debezium.data.Bits;
import io.debezium.data.Enum;
import io.debezium.data.Envelope;
@ -47,6 +44,8 @@
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
@ -61,7 +60,7 @@
public class RecordsSnapshotProducerIT extends AbstractRecordsProducerTest {
@Rule
public final TestRule skip = new SkipTestDependingOnDatabaseVersionRule();
public final SkipTestRule skip = new SkipTestRule();
@Before
public void before() throws Exception {
@ -376,7 +375,7 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro
@Test
@FixFor("DBZ-1118")
@SkipWhenDatabaseVersionLessThan(POSTGRES_10)
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 10, reason = "Database version is less than 10.0")
public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
TestHelper.dropAllSchemas();
@ -492,8 +491,7 @@ public void shouldGenerateSnapshotForATableWithoutPrimaryKey() throws Exception
}
@Test
// MACADDR8 Postgres type is only supported since Postgres version 10
@SkipWhenDatabaseVersionLessThan(POSTGRES_10)
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 10, reason = "MACADDR8 data type is only supported since Postgres 10+")
@FixFor("DBZ-1193")
public void shouldGenerateSnapshotForMacaddr8Datatype() throws Exception {
TestHelper.dropAllSchemas();

View File

@ -0,0 +1,30 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.junit;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import io.debezium.connector.postgresql.TestHelper;
import io.debezium.junit.DatabaseVersionResolver;
/**
* Implementation of {@link DatabaseVersionResolver} specific for PostgreSQL.
*
* @author Chris Cranford
*/
public class PostgresDatabaseVersionResolver implements DatabaseVersionResolver {
@Override
public DatabaseVersion getVersion() {
try {
final DatabaseMetaData metadata = TestHelper.create().connection().getMetaData();
return new DatabaseVersion(metadata.getDatabaseMajorVersion(), metadata.getDatabaseMajorVersion(), 0);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.junit;
import java.sql.SQLException;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import io.debezium.connector.postgresql.TestHelper;
import io.debezium.junit.AnnotationBasedTestRule;
/**
* JUnit rule that skips a test based on the {@link SkipWhenDatabaseVersionLessThan} annotation either on a test method
* or on a test class.
*
* @author Gunnar Morling
*/
public class SkipTestDependingOnDatabaseVersionRule extends AnnotationBasedTestRule {
private static final int majorDbVersion = determineDbVersion();
@Override
public Statement apply(Statement base, Description description) {
SkipWhenDatabaseVersionLessThan skip = hasAnnotation(description, SkipWhenDatabaseVersionLessThan.class);
if (skip != null && skip.value().isLargerThan(majorDbVersion)) {
String reasonForSkipping = "Database version less than " + skip.value();
return emptyStatement(reasonForSkipping, description);
}
return base;
}
public static int determineDbVersion() {
try {
return TestHelper.create().connection().getMetaData().getDatabaseMajorVersion();
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation used together with the {@link SkipTestDependingOnDatabaseVersionRule} JUnit rule, that allows
* tests to be skipped based on the Postgres version used for testing.
*
* @author Gunnar Morling
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.TYPE })
public @interface SkipWhenDatabaseVersionLessThan {
PostgresVersion value();
public enum PostgresVersion {
POSTGRES_10 {
@Override
boolean isLargerThan(int otherMajor) {
return otherMajor < 10;
}
};
abstract boolean isLargerThan(int otherMajor);
}
}

View File

@ -16,3 +16,4 @@ log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
#log4j.logger.io.debezium.connector.postgresql.RecordsStreamProducer=DEBUG
#log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG
#log4j.logger.io.debezium.connector.postgresql.PostgresConnectorTask=DEBUG
log4j.logger.org.reflections=ERROR

View File

@ -89,6 +89,11 @@
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.12</version>
</dependency>
<!-- Used for unit testing with Kafka -->
<dependency>

View File

@ -0,0 +1,96 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
/**
* Defines a contract on how to obtain the database version for a given source connector implementation.
*
* @author Chris Cranford
*/
public interface DatabaseVersionResolver {
/**
* Return the source database version.
*/
DatabaseVersion getVersion();
public class DatabaseVersion {
private final int dbVersionMajor;
private final int dbVersionMinor;
private final int dbVersionPatch;
public DatabaseVersion(int major, int minor, int patch) {
this.dbVersionMajor = major;
this.dbVersionMinor = minor;
this.dbVersionPatch = patch;
}
public int getMajor() {
return dbVersionMajor;
}
public int getMinor() {
return dbVersionMinor;
}
public int getPatch() {
return dbVersionPatch;
}
public boolean isLessThan(int major, int minor, int patch) {
if (dbVersionMajor < major) {
return true;
}
else if (dbVersionMajor == major) {
if (minor == -1 || dbVersionMinor < minor) {
return true;
}
else if (dbVersionMinor == minor) {
if (patch != -1 || dbVersionPatch < patch) {
return true;
}
}
}
return false;
}
public boolean isLessThanEqualTo(int major, int minor, int patch) {
return isLessThan(major, minor, patch) || isEqualTo(major, minor, patch);
}
public boolean isEqualTo(int major, int minor, int patch) {
if (dbVersionMajor == major) {
if (minor == -1 || dbVersionMinor == minor) {
if (patch == -1 || dbVersionPatch == patch) {
return true;
}
}
}
return false;
}
public boolean isGreaterThanEqualTo(int major, int minor, int patch) {
return isGreaterThan(major, minor, patch) || isEqualTo(major, minor, patch);
}
public boolean isGreaterThan(int major, int minor, int patch) {
if (dbVersionMajor > major) {
return true;
}
else if (dbVersionMajor == major) {
if (minor == -1 || dbVersionMinor > minor) {
return true;
}
else if (dbVersionMinor == minor) {
if (patch != -1 || dbVersionPatch > patch) {
return true;
}
}
}
return false;
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
/**
* Used by {@link SkipWhenKafkaVersion} and {@link SkipWhenDatabaseVersion} to define the type of skip rule.
*
* @author Chris Cranford
*/
public enum EqualityCheck {
LESS_THAN,
LESS_THAN_OR_EQUAL,
EQUAL,
GREATER_THAN_OR_EQUAL,
GREATER_THAN
}

View File

@ -8,10 +8,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Set;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.reflections.Reflections;
import io.debezium.junit.DatabaseVersionResolver.DatabaseVersion;
import io.debezium.util.Testing;
/**
@ -50,7 +53,7 @@ public Statement apply(Statement base,
SkipWhenKafkaVersion skipWhenKafkaVersionAnnotation = hasAnnotation(description, SkipWhenKafkaVersion.class);
if (skipWhenKafkaVersionAnnotation != null) {
SkipWhenKafkaVersion.KafkaVersion kafkaVersion = skipWhenKafkaVersionAnnotation.value();
SkipWhenKafkaVersion.EqualityCheck check = skipWhenKafkaVersionAnnotation.check();
EqualityCheck check = skipWhenKafkaVersionAnnotation.check();
try (InputStream stream = Testing.class.getResourceAsStream("/kafka/kafka-version.properties")) {
if (stream != null) {
final Properties properties = new Properties();
@ -97,6 +100,64 @@ public Statement apply(Statement base,
}
}
// First check if multiple database version skips are specified.
SkipWhenDatabaseVersions skipWhenDatabaseVersions = hasAnnotation(description, SkipWhenDatabaseVersions.class);
if (skipWhenDatabaseVersions != null) {
for (SkipWhenDatabaseVersion skipWhenDatabaseVersion : skipWhenDatabaseVersions.value()) {
if (isSkippedByDatabaseVersion(skipWhenDatabaseVersion)) {
return emptyStatement(skipWhenDatabaseVersion.reason(), description);
}
}
}
// Now check if a single database version skip is specified.
SkipWhenDatabaseVersion skipWhenDatabaseVersion = hasAnnotation(description, SkipWhenDatabaseVersion.class);
if (skipWhenDatabaseVersion != null) {
if (isSkippedByDatabaseVersion(skipWhenDatabaseVersion)) {
return emptyStatement(skipWhenDatabaseVersion.reason(), description);
}
}
return base;
}
private boolean isSkippedByDatabaseVersion(SkipWhenDatabaseVersion skipWhenDatabaseVersion) {
final EqualityCheck equalityCheck = skipWhenDatabaseVersion.check();
final int major = skipWhenDatabaseVersion.major();
final int minor = skipWhenDatabaseVersion.minor();
final int patch = skipWhenDatabaseVersion.patch();
// Scans the class path for SkipWhenDatabaseVersionResolver implementations under io.debezium packages
final Reflections reflections = new Reflections("io.debezium");
Set<Class<? extends DatabaseVersionResolver>> resolvers = reflections.getSubTypesOf(DatabaseVersionResolver.class);
Class<? extends DatabaseVersionResolver> resolverClass = resolvers.stream().findFirst().orElse(null);
if (resolverClass != null) {
try {
final DatabaseVersionResolver resolver = resolverClass.getDeclaredConstructor().newInstance();
DatabaseVersion dbVersion = resolver.getVersion();
if (dbVersion != null) {
switch (equalityCheck) {
case LESS_THAN:
return dbVersion.isLessThan(major, minor, patch);
case LESS_THAN_OR_EQUAL:
return dbVersion.isLessThanEqualTo(major, minor, patch);
case EQUAL:
return dbVersion.isEqualTo(major, minor, patch);
case GREATER_THAN_OR_EQUAL:
return dbVersion.isGreaterThanEqualTo(major, minor, patch);
case GREATER_THAN:
return dbVersion.isGreaterThan(major, minor, patch);
}
}
}
catch (Exception e) {
// In the event that the class cannot be loaded, run the test.
e.printStackTrace();
}
}
return false;
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation used together with the {@link SkipTestRule} JUnit rule, that allows tests to be skipped
* based on the database version used for testing.
*
* @author Chris Cranford
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.TYPE })
@Repeatable(SkipWhenDatabaseVersions.class)
public @interface SkipWhenDatabaseVersion {
/**
* Specify the database major version; cannot be omitted.
*/
int major();
/**
* Specify the database minor version; defaults to {@code -1} to imply skipping minor version check.
*/
int minor() default -1;
/**
* Specify the database patch version; defaults to {code -1} to imply skipping patch version check.
* @return
*/
int patch() default -1;
/**
* Defines the type of equal-check that should initiate the skip on the test class or method.
*/
EqualityCheck check();
/**
* Specifies the reason for skipping the method, used when logging a skipped test.
*/
String reason();
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.junit;
package io.debezium.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -18,8 +18,9 @@
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
public @interface SkipWhenDatabaseVersions {
/**
* Specifies the versions the test should be skipped for.
*/
SkipWhenDatabaseVersion[] value() default {};
SkipWhenDatabaseVersion[] value();
}

View File

@ -64,12 +64,4 @@ boolean isGreaterThan(int major, int minor, int patch) {
abstract boolean isGreaterThan(int major, int minor, int patch);
}
public enum EqualityCheck {
LESS_THAN,
LESS_THAN_OR_EQUAL,
EQUAL,
GREATER_THAN_OR_EQUAL,
GREATER_THAN
}
}