DBZ-688 Moving differences related solely to geo-spatial test into the test class itself
This commit is contained in:
parent
c8f09f110b
commit
1258beae55
@ -512,8 +512,12 @@ protected boolean matches(String upperCaseTypeName, String upperCaseMatch) {
|
||||
* @return {@code true} if the type is geometry collection
|
||||
*/
|
||||
protected boolean isGeometryCollection(String upperCaseTypeName) {
|
||||
if (upperCaseTypeName == null) return false;
|
||||
return upperCaseTypeName.equals("GEOMETRYCOLLECTION") || upperCaseTypeName.equals("GEOMCOLLECTION") || upperCaseTypeName.endsWith(".GEOMCOLLECTION");
|
||||
if (upperCaseTypeName == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return upperCaseTypeName.equals("GEOMETRYCOLLECTION") || upperCaseTypeName.equals("GEOMCOLLECTION")
|
||||
|| upperCaseTypeName.endsWith(".GEOMCOLLECTION");
|
||||
}
|
||||
|
||||
protected List<String> extractEnumAndSetOptions(Column column) {
|
||||
|
@ -9,13 +9,9 @@
|
||||
* A centralized expression of differences in behaviour between MySQL 5.x and 8.x
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
*/
|
||||
public interface DatabaseDifferences {
|
||||
|
||||
boolean isCurrentDateTimeDefaultGenerated();
|
||||
String currentDateTimeDefaultOptional(String isoString);
|
||||
String geometryDatabaseName();
|
||||
int geometryPointTableRecords();
|
||||
void geometryAssertPoints(Double expectedX, Double expectedY, Double actualX, Double actualY);
|
||||
}
|
||||
|
@ -5,13 +5,9 @@
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.fest.assertions.Delta;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.jdbc.JdbcConfiguration;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
@ -23,7 +19,13 @@
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
public class MySQLConnection extends JdbcConnection {
|
||||
|
||||
public enum MySqlVersion {
|
||||
MYSQL_5, MYSQL_8;
|
||||
}
|
||||
|
||||
private DatabaseDifferences databaseAsserts;
|
||||
private MySqlVersion mySqlVersion;
|
||||
|
||||
/**
|
||||
* Obtain a connection instance to the named test database.
|
||||
@ -89,82 +91,53 @@ public MySQLConnection(Configuration config) {
|
||||
super(config, FACTORY, null, MySQLConnection::addDefaults);
|
||||
}
|
||||
|
||||
public DatabaseDifferences databaseAsserts() {
|
||||
if (databaseAsserts == null) {
|
||||
public MySqlVersion getMySqlVersion() {
|
||||
if (mySqlVersion == null) {
|
||||
String versionString;
|
||||
try {
|
||||
final String versionString = connect().queryAndMap("SHOW GLOBAL VARIABLES LIKE 'version'", rs -> {
|
||||
versionString = connect().queryAndMap("SHOW GLOBAL VARIABLES LIKE 'version'", rs -> {
|
||||
rs.next();
|
||||
return rs.getString(2);
|
||||
});
|
||||
if (versionString.startsWith("8.")) {
|
||||
databaseAsserts = new DatabaseDifferences() {
|
||||
@Override
|
||||
public boolean isCurrentDateTimeDefaultGenerated() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String currentDateTimeDefaultOptional(String isoString) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String geometryDatabaseName() {
|
||||
return "geometry_test_8";
|
||||
}
|
||||
|
||||
/**
|
||||
* MySQL 8 does not support unknown SRIDs so the case is removed
|
||||
*/
|
||||
@Override
|
||||
public int geometryPointTableRecords() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
/**
|
||||
* MySQL 8 returns X and Y in a different order
|
||||
*/
|
||||
@Override
|
||||
public void geometryAssertPoints(Double expectedX, Double expectedY, Double actualX,
|
||||
Double actualY) {
|
||||
assertThat(actualX).isEqualTo(expectedY, Delta.delta(0.01));
|
||||
assertThat(actualY).isEqualTo(expectedX, Delta.delta(0.01));
|
||||
}
|
||||
};
|
||||
}
|
||||
else {
|
||||
databaseAsserts = new DatabaseDifferences() {
|
||||
@Override
|
||||
public boolean isCurrentDateTimeDefaultGenerated() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String currentDateTimeDefaultOptional(String isoString) {
|
||||
return isoString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String geometryDatabaseName() {
|
||||
return "geometry_test_5";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int geometryPointTableRecords() {
|
||||
return 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void geometryAssertPoints(Double expectedX, Double expectedY, Double actualX,
|
||||
Double actualY) {
|
||||
assertThat(actualX).isEqualTo(expectedX, Delta.delta(0.01));
|
||||
assertThat(actualY).isEqualTo(expectedY, Delta.delta(0.01));
|
||||
}
|
||||
};
|
||||
}
|
||||
mySqlVersion = versionString.startsWith("8.") ? MySqlVersion.MYSQL_8 : MySqlVersion.MYSQL_5;
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new IllegalStateException(e);
|
||||
throw new IllegalStateException("Couldn't obtain MySQL Server version", e);
|
||||
}
|
||||
}
|
||||
|
||||
return mySqlVersion;
|
||||
}
|
||||
|
||||
public DatabaseDifferences databaseAsserts() {
|
||||
if (databaseAsserts == null) {
|
||||
if (getMySqlVersion() == MySqlVersion.MYSQL_8) {
|
||||
databaseAsserts = new DatabaseDifferences() {
|
||||
@Override
|
||||
public boolean isCurrentDateTimeDefaultGenerated() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String currentDateTimeDefaultOptional(String isoString) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
else {
|
||||
databaseAsserts = new DatabaseDifferences() {
|
||||
@Override
|
||||
public boolean isCurrentDateTimeDefaultGenerated() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String currentDateTimeDefaultOptional(String isoString) {
|
||||
return isoString;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
return databaseAsserts;
|
||||
|
@ -13,12 +13,14 @@
|
||||
import javax.xml.bind.DatatypeConverter;
|
||||
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.fest.assertions.Delta;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mysql.MySQLConnection.MySqlVersion;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.util.Testing;
|
||||
@ -34,17 +36,19 @@ public class MySqlGeometryIT extends AbstractConnectorTest {
|
||||
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt")
|
||||
.toAbsolutePath();
|
||||
private UniqueDatabase DATABASE;
|
||||
private DatabaseDifferences databaseAsserts;
|
||||
private DatabaseGeoDifferences databaseDifferences;
|
||||
|
||||
private Configuration config;
|
||||
|
||||
@Before
|
||||
public void beforeEach() {
|
||||
stopConnector();
|
||||
databaseAsserts = MySQLConnection.forTestDatabase("emptydb").databaseAsserts();
|
||||
DATABASE = new UniqueDatabase("geometryit", databaseAsserts.geometryDatabaseName())
|
||||
databaseDifferences = databaseGeoDifferences(MySQLConnection.forTestDatabase("emptydb").getMySqlVersion());
|
||||
|
||||
DATABASE = new UniqueDatabase("geometryit", databaseDifferences.geometryDatabaseName())
|
||||
.withDbHistoryPath(DB_HISTORY_PATH);
|
||||
DATABASE.createAndInitialize();
|
||||
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(DB_HISTORY_PATH);
|
||||
}
|
||||
@ -74,12 +78,12 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
|
||||
//Testing.Debug.enable();
|
||||
int numCreateDatabase = 1;
|
||||
int numCreateTables = 2;
|
||||
int numDataRecords = databaseAsserts.geometryPointTableRecords() + 2;
|
||||
int numDataRecords = databaseDifferences.geometryPointTableRecords() + 2;
|
||||
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
|
||||
stopConnector();
|
||||
assertThat(records).isNotNull();
|
||||
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numCreateDatabase + numCreateTables);
|
||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_222_point")).size()).isEqualTo(databaseAsserts.geometryPointTableRecords());
|
||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_222_point")).size()).isEqualTo(databaseDifferences.geometryPointTableRecords());
|
||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_507_geometry")).size()).isEqualTo(2);
|
||||
assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
|
||||
assertThat(records.databaseNames().size()).isEqualTo(1);
|
||||
@ -116,7 +120,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
//Testing.Debug.enable();
|
||||
int numTables = 2;
|
||||
int numDataRecords = databaseAsserts.geometryPointTableRecords() + 2;
|
||||
int numDataRecords = databaseDifferences.geometryPointTableRecords() + 2;
|
||||
int numDdlRecords =
|
||||
numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
|
||||
int numSetVariables = 1;
|
||||
@ -124,7 +128,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
||||
stopConnector();
|
||||
assertThat(records).isNotNull();
|
||||
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numDdlRecords + numSetVariables);
|
||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_222_point")).size()).isEqualTo(databaseAsserts.geometryPointTableRecords());
|
||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_222_point")).size()).isEqualTo(databaseDifferences.geometryPointTableRecords());
|
||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_507_geometry")).size()).isEqualTo(2);
|
||||
assertThat(records.topics().size()).isEqualTo(numTables + 1);
|
||||
assertThat(records.databaseNames()).containsOnly(DATABASE.getDatabaseName(), "");
|
||||
@ -162,12 +166,12 @@ private void assertPoint(Struct value) {
|
||||
Double actualY = after.getStruct("point").getFloat64("y");
|
||||
Integer actualSrid = after.getStruct("point").getInt32("srid");
|
||||
//Validate the values
|
||||
databaseAsserts.geometryAssertPoints(expectedX, expectedY, actualX, actualY);
|
||||
databaseDifferences.geometryAssertPoints(expectedX, expectedY, actualX, actualY);
|
||||
assertThat(actualSrid).isEqualTo(expectedSrid);
|
||||
//Test WKB
|
||||
Point point = (Point) WkbGeometryReader.readGeometry(new ByteReader((byte[]) after.getStruct("point")
|
||||
.get("wkb")));
|
||||
databaseAsserts.geometryAssertPoints(expectedX, expectedY, point.getX(), point.getY());
|
||||
databaseDifferences.geometryAssertPoints(expectedX, expectedY, point.getX(), point.getY());
|
||||
} else if (expectedX != null) {
|
||||
Assert.fail("Got a null geometry but didn't expect to");
|
||||
}
|
||||
@ -200,6 +204,62 @@ private void assertGeomRecord(Struct value) {
|
||||
assertThat(after.getStruct("polygon")).isNull();
|
||||
assertThat(after.getStruct("collection")).isNull();
|
||||
}
|
||||
}
|
||||
|
||||
private DatabaseGeoDifferences databaseGeoDifferences(MySqlVersion mySqlVersion) {
|
||||
if (mySqlVersion == MySqlVersion.MYSQL_5) {
|
||||
return new DatabaseGeoDifferences() {
|
||||
|
||||
@Override
|
||||
public String geometryDatabaseName() {
|
||||
return "geometry_test_5";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int geometryPointTableRecords() {
|
||||
return 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void geometryAssertPoints(Double expectedX, Double expectedY, Double actualX,
|
||||
Double actualY) {
|
||||
assertThat(actualX).isEqualTo(expectedX, Delta.delta(0.01));
|
||||
assertThat(actualY).isEqualTo(expectedY, Delta.delta(0.01));
|
||||
}
|
||||
};
|
||||
}
|
||||
else {
|
||||
return new DatabaseGeoDifferences() {
|
||||
|
||||
@Override
|
||||
public String geometryDatabaseName() {
|
||||
return "geometry_test_8";
|
||||
}
|
||||
|
||||
/**
|
||||
* MySQL 8 does not support unknown SRIDs so the case is removed
|
||||
*/
|
||||
@Override
|
||||
public int geometryPointTableRecords() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
/**
|
||||
* MySQL 8 returns X and Y in a different order
|
||||
*/
|
||||
@Override
|
||||
public void geometryAssertPoints(Double expectedX, Double expectedY, Double actualX,
|
||||
Double actualY) {
|
||||
assertThat(actualX).isEqualTo(expectedY, Delta.delta(0.01));
|
||||
assertThat(actualY).isEqualTo(expectedX, Delta.delta(0.01));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private interface DatabaseGeoDifferences {
|
||||
String geometryDatabaseName();
|
||||
int geometryPointTableRecords();
|
||||
void geometryAssertPoints(Double expectedX, Double expectedY, Double actualX, Double actualY);
|
||||
}
|
||||
}
|
||||
|
@ -517,7 +517,11 @@ public void dateAndTimeTest() throws InterruptedException {
|
||||
//current timestamp will be replaced with epoch timestamp
|
||||
ZonedDateTime t5 = ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC);
|
||||
String isoString5 = ZonedTimestamp.toIsoString(t5, ZoneOffset.UTC, MySqlValueConverters::adjustTemporal);
|
||||
assertThat(schemaJ.defaultValue()).isEqualTo(MySQLConnection.forTestDatabase(DATABASE.getDatabaseName()).databaseAsserts().currentDateTimeDefaultOptional(isoString5));
|
||||
assertThat(schemaJ.defaultValue()).isEqualTo(
|
||||
MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())
|
||||
.databaseAsserts()
|
||||
.currentDateTimeDefaultOptional(isoString5)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user