DBZ-5044 Centralize datatype schemas

This commit is contained in:
Anisha Mohanty 2022-07-14 10:58:24 +05:30 committed by Jiri Pechanec
parent 77efd818da
commit 91ccb7578f
12 changed files with 150 additions and 54 deletions

View File

@ -5,8 +5,8 @@
*/ */
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import static io.debezium.data.Enum.LOGICAL_NAME; import static io.debezium.data.Enum.ENUM_SCHEMA_NAME;
import static io.debezium.data.Enum.VALUES_FIELD; import static io.debezium.data.Enum.ENUM_VALUES_FIELD;
import static io.debezium.junit.EqualityCheck.LESS_THAN; import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
@ -81,8 +81,8 @@ public void shouldAlterEnumColumnCharacterSet() throws Exception {
Schema schemaBeforeAlter = records.allRecordsInOrder().get(2).valueSchema().field(FieldName.AFTER).schema(); Schema schemaBeforeAlter = records.allRecordsInOrder().get(2).valueSchema().field(FieldName.AFTER).schema();
Schema schemaAfterAlter = records.allRecordsInOrder().get(4).valueSchema().field(FieldName.AFTER).schema(); Schema schemaAfterAlter = records.allRecordsInOrder().get(4).valueSchema().field(FieldName.AFTER).schema();
String allowedBeforeAlter = schemaBeforeAlter.field("type").schema().parameters().get(VALUES_FIELD); String allowedBeforeAlter = schemaBeforeAlter.field("type").schema().parameters().get(ENUM_VALUES_FIELD);
String allowedAfterAlter = schemaAfterAlter.field("type").schema().parameters().get(VALUES_FIELD); String allowedAfterAlter = schemaAfterAlter.field("type").schema().parameters().get(ENUM_VALUES_FIELD);
assertThat(allowedBeforeAlter).isEqualTo("station,post_office"); assertThat(allowedBeforeAlter).isEqualTo("station,post_office");
assertThat(allowedAfterAlter).isEqualTo("station,post_office,plane,ahihi_dongok,now,test,a\\,b,c\\,'d,g\\,'h"); assertThat(allowedAfterAlter).isEqualTo("station,post_office,plane,ahihi_dongok,now,test,a\\,b,c\\,'d,g\\,'h");
@ -106,7 +106,7 @@ public void shouldPropagateColumnSourceType() throws Exception {
Schema schemaBeforeAlter = recordBefore.valueSchema().field(FieldName.AFTER).schema(); Schema schemaBeforeAlter = recordBefore.valueSchema().field(FieldName.AFTER).schema();
Schema typeBeforeSchema = schemaBeforeAlter.field("type").schema(); Schema typeBeforeSchema = schemaBeforeAlter.field("type").schema();
assertThat(typeBeforeSchema.name()).isEqualTo(LOGICAL_NAME); assertThat(typeBeforeSchema.name()).isEqualTo(ENUM_SCHEMA_NAME);
Map<String, String> beforeParameters = typeBeforeSchema.parameters(); Map<String, String> beforeParameters = typeBeforeSchema.parameters();
assertThat(beforeParameters.get(TYPE_NAME_PARAMETER_KEY)).isEqualTo("ENUM"); assertThat(beforeParameters.get(TYPE_NAME_PARAMETER_KEY)).isEqualTo("ENUM");
@ -117,7 +117,7 @@ public void shouldPropagateColumnSourceType() throws Exception {
Schema schemaAfterAlter = recordAfter.valueSchema().field(FieldName.AFTER).schema(); Schema schemaAfterAlter = recordAfter.valueSchema().field(FieldName.AFTER).schema();
Schema typeAfterSchema = schemaAfterAlter.field("type").schema(); Schema typeAfterSchema = schemaAfterAlter.field("type").schema();
assertThat(typeAfterSchema.name()).isEqualTo(LOGICAL_NAME); assertThat(typeAfterSchema.name()).isEqualTo(ENUM_SCHEMA_NAME);
Map<String, String> afterParameters = schemaAfterAlter.field("type").schema().parameters(); Map<String, String> afterParameters = schemaAfterAlter.field("type").schema().parameters();
assertThat(afterParameters.get(TYPE_NAME_PARAMETER_KEY)).isEqualTo("ENUM"); assertThat(afterParameters.get(TYPE_NAME_PARAMETER_KEY)).isEqualTo("ENUM");

View File

@ -45,7 +45,7 @@ public enum SnapshotRecord {
public static SnapshotRecord fromSource(Struct source) { public static SnapshotRecord fromSource(Struct source) {
if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null
&& io.debezium.data.Enum.LOGICAL_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).schema().name())) { && io.debezium.data.Enum.ENUM_SCHEMA_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).schema().name())) {
final String snapshotString = source.getString(AbstractSourceInfo.SNAPSHOT_KEY); final String snapshotString = source.getString(AbstractSourceInfo.SNAPSHOT_KEY);
if (snapshotString != null) { if (snapshotString != null) {
return SnapshotRecord.valueOf(snapshotString.toUpperCase()); return SnapshotRecord.valueOf(snapshotString.toUpperCase());

View File

@ -10,6 +10,8 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.schema.SchemaFactory;
/** /**
* A set of bits of arbitrary length. * A set of bits of arbitrary length.
* *
@ -17,8 +19,10 @@
*/ */
public class Bits { public class Bits {
public static final String LOGICAL_NAME = "io.debezium.data.Bits"; public static final String BITS_SCHEMA_NAME = "io.debezium.data.Bits";
public static final String LENGTH_FIELD = "length"; public static final String BITS_LENGTH_FIELD = "length";
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/** /**
* Returns a {@link SchemaBuilder} for a Bits. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for a Bits. You can use the resulting SchemaBuilder
@ -28,10 +32,7 @@ public class Bits {
* @return the schema builder * @return the schema builder
*/ */
public static SchemaBuilder builder(int length) { public static SchemaBuilder builder(int length) {
return SchemaBuilder.bytes() return schemaFactoryObject.datatypeBitsSchema(length);
.name(LOGICAL_NAME)
.parameter(LENGTH_FIELD, Integer.toString(length))
.version(1);
} }
/** /**

View File

@ -10,6 +10,7 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.schema.SchemaFactory;
import io.debezium.util.Strings; import io.debezium.util.Strings;
/** /**
@ -19,8 +20,10 @@
*/ */
public class Enum { public class Enum {
public static final String LOGICAL_NAME = "io.debezium.data.Enum"; public static final String ENUM_SCHEMA_NAME = "io.debezium.data.Enum";
public static final String VALUES_FIELD = "allowed"; public static final String ENUM_VALUES_FIELD = "allowed";
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/** /**
* Returns a {@link SchemaBuilder} for an enumeration. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for an enumeration. You can use the resulting SchemaBuilder
@ -30,10 +33,7 @@ public class Enum {
* @return the schema builder * @return the schema builder
*/ */
public static SchemaBuilder builder(String allowedValues) { public static SchemaBuilder builder(String allowedValues) {
return SchemaBuilder.string() return schemaFactoryObject.datatypeEnumSchema(allowedValues);
.name(LOGICAL_NAME)
.parameter(VALUES_FIELD, allowedValues)
.version(1);
} }
/** /**

View File

@ -10,6 +10,7 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.schema.SchemaFactory;
import io.debezium.util.Strings; import io.debezium.util.Strings;
/** /**
@ -19,8 +20,10 @@
*/ */
public class EnumSet { public class EnumSet {
public static final String LOGICAL_NAME = "io.debezium.data.EnumSet"; public static final String ENUM_SET_SCHEMA_NAME = "io.debezium.data.EnumSet";
public static final String VALUES_FIELD = "allowed"; public static final String ENUM_SET_VALUES_FIELD = "allowed";
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/** /**
* Returns a {@link SchemaBuilder} for a set of enumerated values. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for a set of enumerated values. You can use the resulting SchemaBuilder
@ -30,10 +33,7 @@ public class EnumSet {
* @return the schema builder * @return the schema builder
*/ */
public static SchemaBuilder builder(String allowedValues) { public static SchemaBuilder builder(String allowedValues) {
return SchemaBuilder.string() return schemaFactoryObject.datatypeEnumSetSchema(allowedValues);
.name(LOGICAL_NAME)
.parameter(VALUES_FIELD, allowedValues)
.version(1);
} }
/** /**

View File

@ -8,6 +8,8 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.schema.SchemaFactory;
/** /**
* A semantic type for a JSON string. * A semantic type for a JSON string.
* *
@ -15,7 +17,9 @@
*/ */
public class Json { public class Json {
public static final String LOGICAL_NAME = "io.debezium.data.Json"; public static final String JSON_SCHEMA_NAME = "io.debezium.data.Json";
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/** /**
* Returns a {@link SchemaBuilder} for a JSON field. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for a JSON field. You can use the resulting SchemaBuilder
@ -24,9 +28,7 @@ public class Json {
* @return the schema builder * @return the schema builder
*/ */
public static SchemaBuilder builder() { public static SchemaBuilder builder() {
return SchemaBuilder.string() return schemaFactoryObject.datatypeJsonSchema();
.name(LOGICAL_NAME)
.version(1);
} }
/** /**

View File

@ -9,6 +9,8 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.schema.SchemaFactory;
/** /**
* A semantic type for a Uuid string. * A semantic type for a Uuid string.
* *
@ -16,7 +18,9 @@
*/ */
public class Uuid { public class Uuid {
public static final String LOGICAL_NAME = "io.debezium.data.Uuid"; public static final String UUID_SCHEMA_NAME = "io.debezium.data.Uuid";
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/** /**
* Returns a {@link SchemaBuilder} for a Uuid field. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for a Uuid field. You can use the resulting SchemaBuilder
@ -25,9 +29,7 @@ public class Uuid {
* @return the schema builder * @return the schema builder
*/ */
public static SchemaBuilder builder() { public static SchemaBuilder builder() {
return SchemaBuilder.string() return schemaFactoryObject.datatypeUuidSchema();
.name(LOGICAL_NAME)
.version(1);
} }
/** /**

View File

@ -14,6 +14,8 @@
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import io.debezium.schema.SchemaFactory;
/** /**
* An arbitrary precision decimal value with variable scale. * An arbitrary precision decimal value with variable scale.
* *
@ -21,11 +23,13 @@
* *
*/ */
public class VariableScaleDecimal { public class VariableScaleDecimal {
public static final String LOGICAL_NAME = "io.debezium.data.VariableScaleDecimal"; public static final String VARIABLE_SCALE_DECIMAL_SCHEMA_NAME = "io.debezium.data.VariableScaleDecimal";
public static final String VALUE_FIELD = "value"; public static final String VARIABLE_SCALE_DECIMAL_VALUE_FIELD = "value";
public static final String SCALE_FIELD = "scale"; public static final String VARIABLE_SCALE_DECIMAL_SCALE_FIELD = "scale";
public static final Struct ZERO = fromLogical(schema(), SpecialValueDecimal.ZERO); public static final Struct ZERO = fromLogical(schema(), SpecialValueDecimal.ZERO);
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/** /**
* Returns a {@link SchemaBuilder} for a VariableScaleDecimal. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for a VariableScaleDecimal. You can use the resulting SchemaBuilder
* to set additional schema settings such as required/optional, default value, and documentation. * to set additional schema settings such as required/optional, default value, and documentation.
@ -33,12 +37,7 @@ public class VariableScaleDecimal {
* @return the schema builder * @return the schema builder
*/ */
public static SchemaBuilder builder() { public static SchemaBuilder builder() {
return SchemaBuilder.struct() return schemaFactoryObject.datatypeVariableScaleDecimalSchema();
.name(LOGICAL_NAME)
.version(1)
.doc("Variable scaled decimal")
.field(SCALE_FIELD, Schema.INT32_SCHEMA)
.field(VALUE_FIELD, Schema.BYTES_SCHEMA);
} }
/** /**
@ -66,7 +65,7 @@ public static Schema optionalSchema() {
* the scale of the number and a binary representation of the number. * the scale of the number and a binary representation of the number.
* *
* @param schema of the encoded value * @param schema of the encoded value
* @param value the value or the decimal * @param decimalValue the value or the decimal
* *
* @return the encoded value * @return the encoded value
*/ */
@ -87,8 +86,8 @@ public static Struct fromLogical(Schema schema, BigDecimal decimalValue) {
Objects.requireNonNull(decimalValue, "decimalValue may not be null"); Objects.requireNonNull(decimalValue, "decimalValue may not be null");
Struct result = new Struct(schema); Struct result = new Struct(schema);
result.put(VALUE_FIELD, decimalValue.unscaledValue().toByteArray()); result.put(VARIABLE_SCALE_DECIMAL_VALUE_FIELD, decimalValue.unscaledValue().toByteArray());
result.put(SCALE_FIELD, decimalValue.scale()); result.put(VARIABLE_SCALE_DECIMAL_SCALE_FIELD, decimalValue.scale());
return result; return result;
} }
@ -100,6 +99,7 @@ public static Struct fromLogical(Schema schema, BigDecimal decimalValue) {
* @return the decoded value * @return the decoded value
*/ */
public static SpecialValueDecimal toLogical(final Struct value) { public static SpecialValueDecimal toLogical(final Struct value) {
return new SpecialValueDecimal(new BigDecimal(new BigInteger(value.getBytes(VALUE_FIELD)), value.getInt32(SCALE_FIELD))); return new SpecialValueDecimal(
new BigDecimal(new BigInteger(value.getBytes(VARIABLE_SCALE_DECIMAL_VALUE_FIELD)), value.getInt32(VARIABLE_SCALE_DECIMAL_SCALE_FIELD)));
} }
} }

View File

@ -9,6 +9,8 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.schema.SchemaFactory;
/** /**
* A semantic type for an XML string. * A semantic type for an XML string.
* *
@ -16,7 +18,9 @@
*/ */
public class Xml { public class Xml {
public static final String LOGICAL_NAME = "io.debezium.data.Xml"; public static final String XML_SCHEMA_NAME = "io.debezium.data.Xml";
private static final SchemaFactory schemaFactoryObject = SchemaFactory.get();
/** /**
* Returns a {@link SchemaBuilder} for an XML field. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for an XML field. You can use the resulting SchemaBuilder
@ -25,9 +29,7 @@ public class Xml {
* @return the schema builder * @return the schema builder
*/ */
public static SchemaBuilder builder() { public static SchemaBuilder builder() {
return SchemaBuilder.string() return schemaFactoryObject.datatypeXmlSchema();
.name(LOGICAL_NAME)
.version(1);
} }
/** /**

View File

@ -10,6 +10,8 @@
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.*;
import io.debezium.data.Enum;
import io.debezium.heartbeat.HeartbeatImpl; import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.relational.history.ConnectTableChangeSerializer; import io.debezium.relational.history.ConnectTableChangeSerializer;
@ -68,6 +70,45 @@ public class SchemaFactory {
private static final String SCHEMA_HISTORY_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change"; private static final String SCHEMA_HISTORY_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change";
private static final int SCHEMA_HISTORY_CHANGE_SCHEMA_VERSION = 1; private static final int SCHEMA_HISTORY_CHANGE_SCHEMA_VERSION = 1;
/*
* Datatype schemas
*/
/*
* Bits schema
*/
private static final int BITS_SCHEMA_VERSION = 1;
/*
* Enum schema
*/
private static final int ENUM_SCHEMA_VERSION = 1;
/*
* EnumSet schema
*/
private static final int ENUM_SET_SCHEMA_VERSION = 1;
/*
* Json schema
*/
private static final int JSON_SCHEMA_VERSION = 1;
/*
* Uuid schema
*/
private static final int UUID_SCHEMA_VERSION = 1;
/*
* Variable Scale Decimal schema
*/
private static final int VARIABLE_SCALE_DECIMAL_SCHEMA_VERSION = 1;
/*
* Xml schema
*/
private static final int XML_SCHEMA_VERSION = 1;
private static final SchemaFactory schemaFactoryObject = new SchemaFactory(); private static final SchemaFactory schemaFactoryObject = new SchemaFactory();
private SchemaFactory() { private SchemaFactory() {
@ -196,4 +237,52 @@ public Schema schemaHistoryConnectorValueSchema(SchemaNameAdjuster adjuster, Com
.field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(serializer.getChangeSchema()).build()) .field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(serializer.getChangeSchema()).build())
.build(); .build();
} }
public SchemaBuilder datatypeBitsSchema(int length) {
return SchemaBuilder.bytes()
.name(Bits.BITS_SCHEMA_NAME)
.version(BITS_SCHEMA_VERSION)
.parameter(Bits.BITS_LENGTH_FIELD, Integer.toString(length));
}
public SchemaBuilder datatypeEnumSchema(String allowedValues) {
return SchemaBuilder.string()
.name(Enum.ENUM_SCHEMA_NAME)
.version(ENUM_SCHEMA_VERSION)
.parameter(Enum.ENUM_VALUES_FIELD, allowedValues);
}
public SchemaBuilder datatypeEnumSetSchema(String allowedValues) {
return SchemaBuilder.string()
.name(EnumSet.ENUM_SET_SCHEMA_NAME)
.version(ENUM_SET_SCHEMA_VERSION)
.parameter(EnumSet.ENUM_SET_VALUES_FIELD, allowedValues);
}
public SchemaBuilder datatypeJsonSchema() {
return SchemaBuilder.string()
.name(Json.JSON_SCHEMA_NAME)
.version(JSON_SCHEMA_VERSION);
}
public SchemaBuilder datatypeUuidSchema() {
return SchemaBuilder.string()
.name(Uuid.UUID_SCHEMA_NAME)
.version(UUID_SCHEMA_VERSION);
}
public SchemaBuilder datatypeVariableScaleDecimalSchema() {
return SchemaBuilder.struct()
.name(VariableScaleDecimal.VARIABLE_SCALE_DECIMAL_SCHEMA_NAME)
.version(VARIABLE_SCALE_DECIMAL_SCHEMA_VERSION)
.doc("Variable scaled decimal")
.field(VariableScaleDecimal.VARIABLE_SCALE_DECIMAL_SCALE_FIELD, Schema.INT32_SCHEMA)
.field(VariableScaleDecimal.VARIABLE_SCALE_DECIMAL_VALUE_FIELD, Schema.BYTES_SCHEMA);
}
public SchemaBuilder datatypeXmlSchema() {
return SchemaBuilder.string()
.name(Xml.XML_SCHEMA_NAME)
.version(XML_SCHEMA_VERSION);
}
} }

View File

@ -35,12 +35,12 @@ public void shouldCreateSchemaFromValues() {
private void assertBuilder(SchemaBuilder builder, String expectedAllowedValues) { private void assertBuilder(SchemaBuilder builder, String expectedAllowedValues) {
assertThat(builder).isNotNull(); assertThat(builder).isNotNull();
assertThat(builder.parameters()).isNotNull(); assertThat(builder.parameters()).isNotNull();
assertThat(builder.parameters().get(EnumSet.VALUES_FIELD)).isEqualTo(expectedAllowedValues); assertThat(builder.parameters().get(EnumSet.ENUM_SET_VALUES_FIELD)).isEqualTo(expectedAllowedValues);
} }
private void assertSchema(Schema schema, String expectedAllowedValues) { private void assertSchema(Schema schema, String expectedAllowedValues) {
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
assertThat(schema.parameters()).isNotNull(); assertThat(schema.parameters()).isNotNull();
assertThat(schema.parameters().get(EnumSet.VALUES_FIELD)).isEqualTo(expectedAllowedValues); assertThat(schema.parameters().get(EnumSet.ENUM_SET_VALUES_FIELD)).isEqualTo(expectedAllowedValues);
} }
} }

View File

@ -36,12 +36,12 @@ public void shouldCreateSchemaFromValues() {
private void assertBuilder(SchemaBuilder builder, String expectedAllowedValues) { private void assertBuilder(SchemaBuilder builder, String expectedAllowedValues) {
assertThat(builder).isNotNull(); assertThat(builder).isNotNull();
assertThat(builder.parameters()).isNotNull(); assertThat(builder.parameters()).isNotNull();
assertThat(builder.parameters().get(Enum.VALUES_FIELD)).isEqualTo(expectedAllowedValues); assertThat(builder.parameters().get(Enum.ENUM_VALUES_FIELD)).isEqualTo(expectedAllowedValues);
} }
private void assertSchema(Schema schema, String expectedAllowedValues) { private void assertSchema(Schema schema, String expectedAllowedValues) {
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
assertThat(schema.parameters()).isNotNull(); assertThat(schema.parameters()).isNotNull();
assertThat(schema.parameters().get(Enum.VALUES_FIELD)).isEqualTo(expectedAllowedValues); assertThat(schema.parameters().get(Enum.ENUM_VALUES_FIELD)).isEqualTo(expectedAllowedValues);
} }
} }