DBZ-1235 Return server name to legacy MySQL

This commit is contained in:
Jiri Pechanec 2019-05-28 09:16:39 +02:00 committed by Gunnar Morling
parent 8ecfbee77d
commit ab460c86af
3 changed files with 55 additions and 4 deletions

View File

@ -10,6 +10,7 @@
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.LegacyV1AbstractSourceInfoStructMaker; import io.debezium.connector.LegacyV1AbstractSourceInfoStructMaker;
public class LegacyV1MySqlSourceInfoStructMaker extends LegacyV1AbstractSourceInfoStructMaker<SourceInfo> { public class LegacyV1MySqlSourceInfoStructMaker extends LegacyV1AbstractSourceInfoStructMaker<SourceInfo> {
@ -20,6 +21,7 @@ public LegacyV1MySqlSourceInfoStructMaker(String connector, String version, Comm
super(connector, version, connectorConfig); super(connector, version, connectorConfig);
schema = commonSchemaBuilder() schema = commonSchemaBuilder()
.name("io.debezium.connector.mysql.Source") .name("io.debezium.connector.mysql.Source")
.field(AbstractSourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.SERVER_ID_KEY, Schema.INT64_SCHEMA) .field(SourceInfo.SERVER_ID_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA) .field(SourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.field(SourceInfo.GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA) .field(SourceInfo.GTID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
@ -42,6 +44,7 @@ public Schema schema() {
@Override @Override
public Struct struct(SourceInfo sourceInfo) { public Struct struct(SourceInfo sourceInfo) {
Struct result = commonStruct(); Struct result = commonStruct();
result.put(SourceInfo.SERVER_NAME_KEY, serverName);
result.put(SourceInfo.SERVER_ID_KEY, sourceInfo.getServerId()); result.put(SourceInfo.SERVER_ID_KEY, sourceInfo.getServerId());
if (sourceInfo.getCurrentGtid() != null) { if (sourceInfo.getCurrentGtid() != null) {
// Don't put the GTID Set into the struct; only the current GTID is fine ... // Don't put the GTID Set into the struct; only the current GTID is fine ...

View File

@ -15,7 +15,8 @@
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.avro.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.GenericAssert; import org.fest.assertions.GenericAssert;
import org.junit.Before; import org.junit.Before;
@ -24,6 +25,7 @@
import io.confluent.connect.avro.AvroData; import io.confluent.connect.avro.AvroData;
import io.debezium.config.CommonConnectorConfig.Version; import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
import io.debezium.document.Document; import io.debezium.document.Document;
@ -491,7 +493,7 @@ protected SourceInfo sourceWith(Map<String, String> offset) {
@Test @Test
public void shouldValidateSourceInfoSchema() { public void shouldValidateSourceInfoSchema() {
org.apache.kafka.connect.data.Schema kafkaSchema = source.schema(); org.apache.kafka.connect.data.Schema kafkaSchema = source.schema();
Schema avroSchema = avroData.fromConnectSchema(kafkaSchema); org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(kafkaSchema);
assertTrue(avroSchema != null); assertTrue(avroSchema != null);
} }
@ -642,6 +644,28 @@ public void connectorIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name()); assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
} }
@Test
public void schemaIsCorrect() {
final Schema schema = SchemaBuilder.struct()
.name("io.debezium.connector.mysql.Source")
.field("version", Schema.OPTIONAL_STRING_SCHEMA)
.field("connector", Schema.OPTIONAL_STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("server_id", Schema.INT64_SCHEMA)
.field("ts_sec", Schema.INT64_SCHEMA)
.field("gtid", Schema.OPTIONAL_STRING_SCHEMA)
.field("file", Schema.STRING_SCHEMA)
.field("pos", Schema.INT64_SCHEMA)
.field("row", Schema.INT32_SCHEMA)
.field("snapshot", SchemaBuilder.bool().optional().defaultValue(false).build())
.field("thread", Schema.OPTIONAL_INT64_SCHEMA)
.field("db", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
.field("query", Schema.OPTIONAL_STRING_SCHEMA)
.build();
VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema);
}
protected Document positionWithGtids(String gtids) { protected Document positionWithGtids(String gtids) {
return positionWithGtids(gtids, false); return positionWithGtids(gtids, false);
} }

View File

@ -15,7 +15,8 @@
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.avro.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.GenericAssert; import org.fest.assertions.GenericAssert;
import org.junit.Before; import org.junit.Before;
@ -23,6 +24,7 @@
import io.confluent.connect.avro.AvroData; import io.confluent.connect.avro.AvroData;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
import io.debezium.document.Document; import io.debezium.document.Document;
@ -489,7 +491,7 @@ protected SourceInfo sourceWith(Map<String, String> offset) {
@Test @Test
public void shouldValidateSourceInfoSchema() { public void shouldValidateSourceInfoSchema() {
org.apache.kafka.connect.data.Schema kafkaSchema = source.schema(); org.apache.kafka.connect.data.Schema kafkaSchema = source.schema();
Schema avroSchema = avroData.fromConnectSchema(kafkaSchema); org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(kafkaSchema);
assertTrue(avroSchema != null); assertTrue(avroSchema != null);
} }
@ -640,6 +642,28 @@ public void connectorIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name()); assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
} }
@Test
public void schemaIsCorrect() {
final Schema schema = SchemaBuilder.struct()
.name("io.debezium.connector.mysql.Source")
.field("version", Schema.STRING_SCHEMA)
.field("connector", Schema.STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("ts_ms", Schema.INT64_SCHEMA)
.field("snapshot", SchemaBuilder.bool().optional().defaultValue(false).build())
.field("db", Schema.STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
.field("server_id", Schema.INT64_SCHEMA)
.field("gtid", Schema.OPTIONAL_STRING_SCHEMA)
.field("file", Schema.STRING_SCHEMA)
.field("pos", Schema.INT64_SCHEMA)
.field("row", Schema.INT32_SCHEMA)
.field("thread", Schema.OPTIONAL_INT64_SCHEMA)
.field("query", Schema.OPTIONAL_STRING_SCHEMA)
.build();
VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema);
}
protected Document positionWithGtids(String gtids) { protected Document positionWithGtids(String gtids) {
return positionWithGtids(gtids, false); return positionWithGtids(gtids, false);
} }