DBZ-222 Added support for MySQL POINT type
This commit is contained in:
parent
e5f38717f0
commit
791545c5f4
@ -125,6 +125,7 @@ protected void initializeDataTypes(DataTypeParser dataTypes) {
|
||||
dataTypes.register(Types.CHAR, "ENUM(...)");
|
||||
dataTypes.register(Types.CHAR, "SET(...)");
|
||||
dataTypes.register(Types.OTHER, "JSON");
|
||||
dataTypes.register(Types.OTHER, "POINT");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
|
||||
import mil.nga.wkb.geom.Point;
|
||||
import mil.nga.wkb.io.ByteReader;
|
||||
import mil.nga.wkb.io.WkbGeometryReader;
|
||||
|
||||
/**
|
||||
* A parser API for MySQL Geometry types, it uses geopackage-wkb-java as a base for parsing Well-Known Binary
|
||||
*
|
||||
* @author oalsafi
|
||||
* @since 05.04.17.
|
||||
*/
|
||||
public class MySqlGeometry {
|
||||
|
||||
private final byte[] wkb;
|
||||
|
||||
/**
|
||||
* Create a MySqlGeometry using the supplied wkb, note this should be the cleaned wkb for MySQL
|
||||
*
|
||||
* @param wkb the Well-Known binary representation of the coordinate in the standard format
|
||||
*/
|
||||
private MySqlGeometry(byte[] wkb) {
|
||||
this.wkb = wkb;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a MySqlGeometry from the original byte array from MySQL binglog event
|
||||
*
|
||||
* @param mysqlBytes he original byte array from MySQL binglog event
|
||||
*
|
||||
* @return a {@link MySqlGeometry} which represents a MySqlGeometry API
|
||||
*/
|
||||
public static MySqlGeometry fromBytes(final byte[] mysqlBytes) {
|
||||
return new MySqlGeometry(convertToWkb(mysqlBytes, 4));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the standard well-known binary representation of the MySQL byte
|
||||
*
|
||||
* @return {@link byte[]} which represents the standard well-known binary
|
||||
*/
|
||||
public byte[] getWkb() {
|
||||
return wkb;
|
||||
}
|
||||
|
||||
/**
|
||||
* It returns a Point coordinate according to OpenGIS based on the WKB
|
||||
*
|
||||
* @return {@link Point} point coordinate
|
||||
*/
|
||||
public Point getPoint() {
|
||||
return (Point) WkbGeometryReader.readGeometry(new ByteReader(wkb));
|
||||
}
|
||||
|
||||
/**
|
||||
* Since MySQL appends 4 bytes as type prefix, we shrink the byte array 4 times in order to have a valid WKB
|
||||
* representation
|
||||
*
|
||||
* @param source the original byte array from MySQL binglog event
|
||||
* @param numOfShifts the number of times we shall shrink the array
|
||||
*
|
||||
* @return a {@link byte[]} which represents the standard well-known binary
|
||||
*/
|
||||
private static byte[] convertToWkb(byte[] source, int numOfShifts) {
|
||||
if (numOfShifts > 0) {
|
||||
int shiftedIndex = numOfShifts - 1;
|
||||
return convertToWkb(ArrayUtils.remove(source, shiftedIndex), shiftedIndex);
|
||||
} else {
|
||||
return source;
|
||||
}
|
||||
}
|
||||
}
|
@ -36,6 +36,9 @@
|
||||
import io.debezium.time.Year;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
import mil.nga.wkb.geom.Point;
|
||||
import mil.nga.wkb.util.WkbException;
|
||||
|
||||
/**
|
||||
* MySQL-specific customization of the conversions from JDBC values obtained from the MySQL binlog client library.
|
||||
* <p>
|
||||
@ -143,6 +146,9 @@ public SchemaBuilder schemaBuilder(Column column) {
|
||||
if (matches(typeName, "JSON")) {
|
||||
return Json.builder();
|
||||
}
|
||||
if (matches(typeName, "POINT")) {
|
||||
return io.debezium.data.geometry.Point.builder();
|
||||
}
|
||||
if (matches(typeName, "YEAR")) {
|
||||
return Year.builder();
|
||||
}
|
||||
@ -165,6 +171,9 @@ public ValueConverter converter(Column column, Field fieldDefn) {
|
||||
if (matches(typeName, "JSON")) {
|
||||
return (data) -> convertJson(column, fieldDefn, data);
|
||||
}
|
||||
if (matches(typeName, "POINT")){
|
||||
return (data -> convertPoint(column, fieldDefn, data));
|
||||
}
|
||||
if (matches(typeName, "YEAR")) {
|
||||
return (data) -> convertYearToInt(column, fieldDefn, data);
|
||||
}
|
||||
@ -448,4 +457,33 @@ protected String convertSetValue(Column column, long indexes, List<String> optio
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the a value representing a POINT {@code byte[]} value to a Point value used in a {@link SourceRecord}.
|
||||
*
|
||||
* @param column the column in which the value appears
|
||||
* @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never null
|
||||
* @param data the data; may be null
|
||||
* @return the converted value, or null if the conversion could not be made and the column allows nulls
|
||||
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
|
||||
*/
|
||||
protected Object convertPoint(Column column, Field fieldDefn, Object data){
|
||||
if (data == null) {
|
||||
data = fieldDefn.schema().defaultValue();
|
||||
}
|
||||
|
||||
Schema schema = fieldDefn.schema();
|
||||
|
||||
if (data instanceof byte[]) {
|
||||
// The binglog utility sends a byte array for any Geometry type, we will use our own binaryParse to parse the byte to WKB, hence
|
||||
// to the suitable class
|
||||
try {
|
||||
MySqlGeometry mySqlGeometry = MySqlGeometry.fromBytes((byte[]) data);
|
||||
Point point = mySqlGeometry.getPoint();
|
||||
return io.debezium.data.geometry.Point.createValue(schema, point.getX(), point.getY(), mySqlGeometry.getWkb());
|
||||
} catch (WkbException e) {
|
||||
throw new ConnectException("Failed to parse and read a value of type POINT on " + column + ": " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
return handleUnknownData(column, fieldDefn, data);
|
||||
}
|
||||
}
|
||||
|
@ -454,3 +454,27 @@ INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafe' AS JSON), -- BLOB as
|
||||
INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafebabe' AS JSON), -- BLOB as Base64
|
||||
'"yv66vg=="',
|
||||
'"yv66vg=="');
|
||||
|
||||
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- DATABASE: geometry_test
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- The integration test for this database expects to scans all of the binlog events associated with this database
|
||||
-- without error or problems. The integration test does not modify any records in this database, so this script
|
||||
-- must contain all operations to these tables.
|
||||
--
|
||||
-- This relies upon MySQL 5.7's Geometries datatypes.
|
||||
CREATE DATABASE geometry_test;
|
||||
USE geometry_test;
|
||||
|
||||
-- DBZ-222 handle POINT column types ...
|
||||
CREATE TABLE dbz_222_point (
|
||||
id INT AUTO_INCREMENT NOT NULL,
|
||||
point POINT DEFAULT NULL,
|
||||
expected_x FLOAT,
|
||||
expected_y FLOAT,
|
||||
PRIMARY KEY (id)
|
||||
) DEFAULT CHARSET=utf8;
|
||||
INSERT INTO dbz_222_point VALUES (default,GeomFromText('POINT(1 1)'), 1.0, 1.0);
|
||||
INSERT INTO dbz_222_point VALUES (default,GeomFromText('POINT(8.25554554 3.22124447)'), 8.25554554, 3.22124447);
|
||||
INSERT INTO dbz_222_point VALUES (default,GeomFromText('POINT(0 0)'), 0.0, 0.0);
|
@ -236,14 +236,12 @@ public void shouldValidateAcceptableConfiguration() {
|
||||
List<Object> dbNames = dbNameRecommender.validValues(MySqlConnectorConfig.DATABASE_WHITELIST, config);
|
||||
Testing.debug("List of dbNames: " + dbNames);
|
||||
assertThat(dbNames).containsOnly("connector_test", "readbinlog_test", "regression_test", "json_test",
|
||||
"connector_test_ro", "emptydb");
|
||||
"connector_test_ro", "emptydb", "geometry_test");
|
||||
|
||||
Recommender tableNameRecommender = MySqlConnectorConfig.TABLE_WHITELIST.recommender();
|
||||
List<Object> tableNames = tableNameRecommender.validValues(MySqlConnectorConfig.TABLE_WHITELIST, config);
|
||||
Testing.debug("List of tableNames: " + tableNames);
|
||||
assertThat(tableNames).containsOnly("readbinlog_test.product",
|
||||
"readbinlog_test.purchased",
|
||||
"readbinlog_test.person",
|
||||
assertThat(tableNames).containsOnly("readbinlog_test.person",
|
||||
"connector_test.customers",
|
||||
"connector_test.orders",
|
||||
"connector_test.products",
|
||||
@ -262,7 +260,8 @@ public void shouldValidateAcceptableConfiguration() {
|
||||
"regression_test.dbz_104_customers",
|
||||
"regression_test.dbz_147_decimalvalues",
|
||||
"regression_test.dbz_195_numvalues",
|
||||
"json_test.dbz_126_jsontable");
|
||||
"json_test.dbz_126_jsontable",
|
||||
"geometry_test.dbz_222_point");
|
||||
|
||||
// Now set the whitelist to two databases ...
|
||||
Configuration config2 = config.edit()
|
||||
|
@ -0,0 +1,188 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.fest.assertions.Delta;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.relational.history.FileDatabaseHistory;
|
||||
import io.debezium.util.Testing;
|
||||
import mil.nga.wkb.geom.Point;
|
||||
import mil.nga.wkb.io.ByteReader;
|
||||
import mil.nga.wkb.io.WkbGeometryReader;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author oalsafi
|
||||
* @since 18.04.17.
|
||||
*/
|
||||
public class MySqlGeometryIT extends AbstractConnectorTest {
|
||||
|
||||
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt")
|
||||
.toAbsolutePath();
|
||||
|
||||
private Configuration config;
|
||||
|
||||
@Before
|
||||
public void beforeEach() {
|
||||
stopConnector();
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(DB_HISTORY_PATH);
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterEach() {
|
||||
try {
|
||||
stopConnector();
|
||||
} finally {
|
||||
Testing.Files.delete(DB_HISTORY_PATH);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws SQLException, InterruptedException {
|
||||
// Use the DB configuration to define the connector's configuration ...
|
||||
config = Configuration.create()
|
||||
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname"))
|
||||
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
|
||||
.with(MySqlConnectorConfig.USER, "snapper")
|
||||
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
|
||||
.with(
|
||||
MySqlConnectorConfig.SSL_MODE,
|
||||
MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()
|
||||
)
|
||||
.with(MySqlConnectorConfig.SERVER_ID, 18765)
|
||||
.with(MySqlConnectorConfig.SERVER_NAME, "geometryit")
|
||||
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
|
||||
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "geometry_test")
|
||||
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
|
||||
.with(
|
||||
MySqlConnectorConfig.SNAPSHOT_MODE,
|
||||
MySqlConnectorConfig.SnapshotMode.NEVER.toString()
|
||||
)
|
||||
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
|
||||
.build();
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Consume all of the events due to startup and initialization of the database
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
//Testing.Debug.enable();
|
||||
int numCreateDatabase = 1;
|
||||
int numCreateTables = 1;
|
||||
int numDataRecords = 3;
|
||||
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
|
||||
stopConnector();
|
||||
assertThat(records).isNotNull();
|
||||
assertThat(records.recordsForTopic("geometryit").size()).isEqualTo(numCreateDatabase + numCreateTables);
|
||||
assertThat(records.recordsForTopic("geometryit.geometry_test.dbz_222_point").size()).isEqualTo(3);
|
||||
assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
|
||||
assertThat(records.databaseNames().size()).isEqualTo(1);
|
||||
assertThat(records.ddlRecordsForDatabase("geometry_test").size()).isEqualTo(
|
||||
numCreateDatabase + numCreateTables);
|
||||
assertThat(records.ddlRecordsForDatabase("regression_test")).isNull();
|
||||
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
|
||||
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
|
||||
assertThat(records.ddlRecordsForDatabase("json_test")).isNull();
|
||||
records.ddlRecordsForDatabase("geometry_test").forEach(this::print);
|
||||
|
||||
// Check that all records are valid, can be serialized and deserialized ...
|
||||
records.forEach(this::validate);
|
||||
records.forEach(record -> {
|
||||
Struct value = (Struct) record.value();
|
||||
if (record.topic().endsWith("dbz_222_point")) {
|
||||
assertPoint(value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
|
||||
// Use the DB configuration to define the connector's configuration ...
|
||||
config = Configuration.create()
|
||||
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname"))
|
||||
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
|
||||
.with(MySqlConnectorConfig.USER, "snapper")
|
||||
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
|
||||
.with(
|
||||
MySqlConnectorConfig.SSL_MODE,
|
||||
MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase()
|
||||
)
|
||||
.with(MySqlConnectorConfig.SERVER_ID, 18765)
|
||||
.with(MySqlConnectorConfig.SERVER_NAME, "geometryit")
|
||||
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
|
||||
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "geometry_test")
|
||||
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
|
||||
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
|
||||
.build();
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Consume all of the events due to startup and initialization of the database
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
//Testing.Debug.enable();
|
||||
int numTables = 1;
|
||||
int numDataRecords = 3;
|
||||
int numDdlRecords =
|
||||
numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
|
||||
int numSetVariables = 1;
|
||||
SourceRecords records = consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords);
|
||||
stopConnector();
|
||||
assertThat(records).isNotNull();
|
||||
assertThat(records.recordsForTopic("geometryit").size()).isEqualTo(numDdlRecords + numSetVariables);
|
||||
assertThat(records.recordsForTopic("geometryit.geometry_test.dbz_222_point").size()).isEqualTo(3);
|
||||
assertThat(records.topics().size()).isEqualTo(numTables + 1);
|
||||
assertThat(records.databaseNames().size()).isEqualTo(2);
|
||||
assertThat(records.databaseNames()).containsOnly("geometry_test", "");
|
||||
assertThat(records.ddlRecordsForDatabase("geometry_test").size()).isEqualTo(numDdlRecords);
|
||||
assertThat(records.ddlRecordsForDatabase("regression_test")).isNull();
|
||||
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
|
||||
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
|
||||
assertThat(records.ddlRecordsForDatabase("json_test")).isNull();
|
||||
assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); // SET statement
|
||||
records.ddlRecordsForDatabase("geometry_test").forEach(this::print);
|
||||
|
||||
// Check that all records are valid, can be serialized and deserialized ...
|
||||
records.forEach(this::validate);
|
||||
records.forEach(record -> {
|
||||
Struct value = (Struct) record.value();
|
||||
if (record.topic().endsWith("dbz_222_point")) {
|
||||
assertPoint(value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void assertPoint(Struct value) {
|
||||
Struct after = value.getStruct(Envelope.FieldName.AFTER);
|
||||
Integer i = after.getInt32("id");
|
||||
Testing.debug(after);
|
||||
assertThat(i).isNotNull();
|
||||
Double expectedX = after.getFloat64("expected_x");
|
||||
Double expectedY = after.getFloat64("expected_y");
|
||||
Double actualX = after.getStruct("point").getFloat64("x");
|
||||
Double actualY = after.getStruct("point").getFloat64("y");
|
||||
//Validate the values
|
||||
assertThat(actualX).isEqualTo(expectedX, Delta.delta(0.01));
|
||||
assertThat(actualY).isEqualTo(expectedY, Delta.delta(0.01));
|
||||
//Test WKB
|
||||
Point point = (Point) WkbGeometryReader.readGeometry(new ByteReader((byte[]) after.getStruct("point")
|
||||
.get("wkb")));
|
||||
assertThat(point.getX()).isEqualTo(expectedX, Delta.delta(0.01));
|
||||
assertThat(point.getY()).isEqualTo(expectedY, Delta.delta(0.01));
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import mil.nga.wkb.geom.Point;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* @author oalsafi
|
||||
* @since 13.04.17.
|
||||
*/
|
||||
public class MySqlGeometryTest {
|
||||
|
||||
private MySqlGeometry mySqlGeometry;
|
||||
|
||||
@Test
|
||||
public void shouldConvertMySqlBytesToPoint() throws Exception {
|
||||
byte[] mysqlBytes = {
|
||||
0, 0, 0, 0, 1, 1, 0, 0, 0, -29, -91, -101, -60, 32, -16, 27, 64, 21, -95, 67, -90, -99, 56, 50, 64
|
||||
}; //This represents 'POINT(6.9845 18.22115554)'
|
||||
mySqlGeometry = MySqlGeometry.fromBytes(mysqlBytes);
|
||||
assertPoint(6.9845, 18.22115554, mySqlGeometry.getPoint());
|
||||
}
|
||||
|
||||
protected void assertPoint(double x, double y, Point point) {
|
||||
assertEquals(x, point.getX(), 0.0001);
|
||||
assertEquals(y, point.getY(), 0.0001);
|
||||
}
|
||||
}
|
@ -80,6 +80,27 @@
|
||||
<artifactId>kafka-connect-avro-converter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mil.nga</groupId>
|
||||
<artifactId>wkb</artifactId>
|
||||
<version>1.0.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.shyiko</groupId>
|
||||
<artifactId>mysql-binlog-connector-java</artifactId>
|
||||
<version>0.11.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.16.16</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<resources>
|
||||
|
@ -14,12 +14,14 @@
|
||||
* A semantic type for a geometric Point, defined as a set of (x,y) coordinates.
|
||||
*
|
||||
* @author Horia Chiorean
|
||||
* @author oalsafi
|
||||
*/
|
||||
public class Point {
|
||||
|
||||
public static final String LOGICAL_NAME = "io.debezium.data.geometry.Point";
|
||||
public static final String X_FIELD = "x";
|
||||
public static final String Y_FIELD = "y";
|
||||
public static final String WKB_FIELD = "wkb"; //Please see DBZ-208
|
||||
|
||||
|
||||
/**
|
||||
@ -34,7 +36,8 @@ public static SchemaBuilder builder() {
|
||||
.version(1)
|
||||
.doc("A geometric point")
|
||||
.field(X_FIELD, Schema.FLOAT64_SCHEMA)
|
||||
.field(Y_FIELD, Schema.FLOAT64_SCHEMA);
|
||||
.field(Y_FIELD, Schema.FLOAT64_SCHEMA)
|
||||
.field(WKB_FIELD, Schema.OPTIONAL_BYTES_SCHEMA);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -59,4 +62,17 @@ public static Struct createValue(Schema pointSchema, double x, double y) {
|
||||
Struct result = new Struct(pointSchema);
|
||||
return result.put(X_FIELD, x).put(Y_FIELD, y);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a value for this schema using 2 given coordinates and WKB as the original representation of the coordinate (Ref: DBZ-208)
|
||||
* @param pointSchema a {@link Schema} instance which represents a point; may not be null
|
||||
* @param x the X coordinate of the point; may not be null
|
||||
* @param y the Y coordinate of the point; may not be null
|
||||
* @param wkb the original Well-Known binary representation of the coordinate
|
||||
* @return a {@link Struct} which represents a Connect value for this schema; never
|
||||
*/
|
||||
public static Struct createValue(Schema pointSchema, double x, double y, byte[] wkb){
|
||||
Struct results = createValue(pointSchema, x, y);
|
||||
return results.put(WKB_FIELD, wkb);
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,7 @@
|
||||
* <p>
|
||||
* See the <a href="http://docs.oracle.com/javase/6/docs/technotes/guides/jdbc/getstart/mapping.html#table1">Java SE Mapping SQL
|
||||
* and Java Types</a> for details about how JDBC {@link Types types} map to Java value types.
|
||||
*
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
@ThreadSafe
|
||||
@ -54,7 +54,7 @@ public class TableSchemaBuilder {
|
||||
|
||||
/**
|
||||
* Create a new instance of the builder.
|
||||
*
|
||||
*
|
||||
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be
|
||||
* null
|
||||
* @param schemaNameValidator the validation function for schema names; may not be null
|
||||
@ -67,7 +67,7 @@ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Functio
|
||||
/**
|
||||
* Create a {@link TableSchema} from the given JDBC {@link ResultSet}. The resulting TableSchema will have no primary key,
|
||||
* and its {@link TableSchema#valueSchema()} will contain fields for each column in the result set.
|
||||
*
|
||||
*
|
||||
* @param resultSet the result set for a query; may not be null
|
||||
* @param name the name of the value schema; may not be null
|
||||
* @return the table schema that can be used for sending rows of data for this table to Kafka Connect; never null
|
||||
@ -99,7 +99,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException
|
||||
* key.
|
||||
* <p>
|
||||
* This is equivalent to calling {@code create(table,false)}.
|
||||
*
|
||||
*
|
||||
* @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
|
||||
* prefix
|
||||
* @param table the table definition; may not be null
|
||||
@ -116,7 +116,7 @@ public TableSchema create(String schemaPrefix, Table table) {
|
||||
* key.
|
||||
* <p>
|
||||
* This is equivalent to calling {@code create(table,false)}.
|
||||
*
|
||||
*
|
||||
* @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
|
||||
* prefix
|
||||
* @param table the table definition; may not be null
|
||||
@ -165,7 +165,7 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
|
||||
|
||||
/**
|
||||
* Creates the function that produces a Kafka Connect key object for a row of data.
|
||||
*
|
||||
*
|
||||
* @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator
|
||||
* will be null
|
||||
* @param columnSetName the name for the set of columns, used in error messages; may not be null
|
||||
@ -202,7 +202,7 @@ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId c
|
||||
|
||||
/**
|
||||
* Creates the function that produces a Kafka Connect value object for a row of data.
|
||||
*
|
||||
*
|
||||
* @param schema the Kafka Connect schema for the value; may be null if there is no known schema, in which case the generator
|
||||
* will be null
|
||||
* @param tableId the table identifier; may not be null
|
||||
@ -267,7 +267,7 @@ protected Field[] fieldsForColumns(Schema schema, List<Column> columns) {
|
||||
/**
|
||||
* Obtain the array of converters for each column in a row. A converter might be null if the column is not be included in
|
||||
* the records.
|
||||
*
|
||||
*
|
||||
* @param schema the schema; may not be null
|
||||
* @param tableId the identifier of the table that contains the columns
|
||||
* @param columns the columns in the row; may not be null
|
||||
@ -305,7 +305,7 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId,
|
||||
|
||||
/**
|
||||
* Add to the supplied {@link SchemaBuilder} a field for the column with the given information.
|
||||
*
|
||||
*
|
||||
* @param builder the schema builder; never null
|
||||
* @param column the column definition
|
||||
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
|
||||
@ -332,7 +332,7 @@ protected void addField(SchemaBuilder builder, Column column, ColumnMapper mappe
|
||||
/**
|
||||
* Create a {@link ValueConverter} that can be used to convert row values for the given column into the Kafka Connect value
|
||||
* object described by the {@link Field field definition}. This uses the supplied {@link ValueConverterProvider} object.
|
||||
*
|
||||
*
|
||||
* @param column the column describing the input values; never null
|
||||
* @param fieldDefn the definition for the field in a Kafka Connect {@link Schema} describing the output of the function;
|
||||
* never null
|
||||
@ -341,4 +341,4 @@ protected void addField(SchemaBuilder builder, Column column, ColumnMapper mappe
|
||||
protected ValueConverter createValueConverterFor(Column column, Field fieldDefn) {
|
||||
return valueConverterProvider.converter(column, fieldDefn);
|
||||
}
|
||||
}
|
||||
}
|
24
pom.xml
24
pom.xml
@ -337,6 +337,8 @@
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<dependencies>
|
||||
@ -381,13 +383,13 @@
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-report-plugin</artifactId>
|
||||
<version>${version.surefire.plugin}</version>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>${version.checkstyle.plugin}</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>${version.checkstyle.plugin}</version>
|
||||
<artifactId>maven-surefire-report-plugin</artifactId>
|
||||
<version>${version.surefire.plugin}</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
@ -591,13 +593,13 @@
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>check-style</id>
|
||||
<phase>verify</phase>
|
||||
<goals>
|
||||
<goal>checkstyle</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>check-style</id>
|
||||
<phase>verify</phase>
|
||||
<goals>
|
||||
<goal>checkstyle</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
Loading…
Reference in New Issue
Block a user