diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlEnumColumnIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlEnumColumnIT.java index 9870e26c1..7c4d87dda 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlEnumColumnIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlEnumColumnIT.java @@ -5,8 +5,8 @@ */ package io.debezium.connector.mysql; -import static io.debezium.data.Enum.LOGICAL_NAME; -import static io.debezium.data.Enum.VALUES_FIELD; +import static io.debezium.data.Enum.ENUM_SCHEMA_NAME; +import static io.debezium.data.Enum.ENUM_VALUES_FIELD; import static io.debezium.junit.EqualityCheck.LESS_THAN; import static org.fest.assertions.Assertions.assertThat; @@ -81,8 +81,8 @@ public void shouldAlterEnumColumnCharacterSet() throws Exception { Schema schemaBeforeAlter = records.allRecordsInOrder().get(2).valueSchema().field(FieldName.AFTER).schema(); Schema schemaAfterAlter = records.allRecordsInOrder().get(4).valueSchema().field(FieldName.AFTER).schema(); - String allowedBeforeAlter = schemaBeforeAlter.field("type").schema().parameters().get(VALUES_FIELD); - String allowedAfterAlter = schemaAfterAlter.field("type").schema().parameters().get(VALUES_FIELD); + String allowedBeforeAlter = schemaBeforeAlter.field("type").schema().parameters().get(ENUM_VALUES_FIELD); + String allowedAfterAlter = schemaAfterAlter.field("type").schema().parameters().get(ENUM_VALUES_FIELD); assertThat(allowedBeforeAlter).isEqualTo("station,post_office"); assertThat(allowedAfterAlter).isEqualTo("station,post_office,plane,ahihi_dongok,now,test,a\\,b,c\\,'d,g\\,'h"); @@ -106,7 +106,7 @@ public void shouldPropagateColumnSourceType() throws Exception { Schema schemaBeforeAlter = recordBefore.valueSchema().field(FieldName.AFTER).schema(); Schema typeBeforeSchema = schemaBeforeAlter.field("type").schema(); - assertThat(typeBeforeSchema.name()).isEqualTo(LOGICAL_NAME); + assertThat(typeBeforeSchema.name()).isEqualTo(ENUM_SCHEMA_NAME); Map beforeParameters = typeBeforeSchema.parameters(); assertThat(beforeParameters.get(TYPE_NAME_PARAMETER_KEY)).isEqualTo("ENUM"); @@ -117,7 +117,7 @@ public void shouldPropagateColumnSourceType() throws Exception { Schema schemaAfterAlter = recordAfter.valueSchema().field(FieldName.AFTER).schema(); Schema typeAfterSchema = schemaAfterAlter.field("type").schema(); - assertThat(typeAfterSchema.name()).isEqualTo(LOGICAL_NAME); + assertThat(typeAfterSchema.name()).isEqualTo(ENUM_SCHEMA_NAME); Map afterParameters = schemaAfterAlter.field("type").schema().parameters(); assertThat(afterParameters.get(TYPE_NAME_PARAMETER_KEY)).isEqualTo("ENUM"); diff --git a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java index 68cb361e5..13e2d6921 100644 --- a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java +++ b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java @@ -45,7 +45,7 @@ public enum SnapshotRecord { public static SnapshotRecord fromSource(Struct source) { if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null - && io.debezium.data.Enum.LOGICAL_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).schema().name())) { + && io.debezium.data.Enum.ENUM_SCHEMA_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).schema().name())) { final String snapshotString = source.getString(AbstractSourceInfo.SNAPSHOT_KEY); if (snapshotString != null) { return SnapshotRecord.valueOf(snapshotString.toUpperCase()); diff --git a/debezium-core/src/main/java/io/debezium/data/Bits.java b/debezium-core/src/main/java/io/debezium/data/Bits.java index 898017abe..89bf26ac4 100644 --- a/debezium-core/src/main/java/io/debezium/data/Bits.java +++ b/debezium-core/src/main/java/io/debezium/data/Bits.java @@ -10,6 +10,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.schema.SchemaFactory; + /** * A set of bits of arbitrary length. * @@ -17,8 +19,10 @@ */ public class Bits { - public static final String LOGICAL_NAME = "io.debezium.data.Bits"; - public static final String LENGTH_FIELD = "length"; + public static final String BITS_SCHEMA_NAME = "io.debezium.data.Bits"; + public static final String BITS_LENGTH_FIELD = "length"; + + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); /** * Returns a {@link SchemaBuilder} for a Bits. You can use the resulting SchemaBuilder @@ -28,10 +32,7 @@ public class Bits { * @return the schema builder */ public static SchemaBuilder builder(int length) { - return SchemaBuilder.bytes() - .name(LOGICAL_NAME) - .parameter(LENGTH_FIELD, Integer.toString(length)) - .version(1); + return schemaFactoryObject.datatypeBitsSchema(length); } /** diff --git a/debezium-core/src/main/java/io/debezium/data/Enum.java b/debezium-core/src/main/java/io/debezium/data/Enum.java index 036145e1d..9c1abfc18 100644 --- a/debezium-core/src/main/java/io/debezium/data/Enum.java +++ b/debezium-core/src/main/java/io/debezium/data/Enum.java @@ -10,6 +10,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.schema.SchemaFactory; import io.debezium.util.Strings; /** @@ -19,8 +20,10 @@ */ public class Enum { - public static final String LOGICAL_NAME = "io.debezium.data.Enum"; - public static final String VALUES_FIELD = "allowed"; + public static final String ENUM_SCHEMA_NAME = "io.debezium.data.Enum"; + public static final String ENUM_VALUES_FIELD = "allowed"; + + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); /** * Returns a {@link SchemaBuilder} for an enumeration. You can use the resulting SchemaBuilder @@ -30,10 +33,7 @@ public class Enum { * @return the schema builder */ public static SchemaBuilder builder(String allowedValues) { - return SchemaBuilder.string() - .name(LOGICAL_NAME) - .parameter(VALUES_FIELD, allowedValues) - .version(1); + return schemaFactoryObject.datatypeEnumSchema(allowedValues); } /** diff --git a/debezium-core/src/main/java/io/debezium/data/EnumSet.java b/debezium-core/src/main/java/io/debezium/data/EnumSet.java index b892310ef..c8c9384b8 100644 --- a/debezium-core/src/main/java/io/debezium/data/EnumSet.java +++ b/debezium-core/src/main/java/io/debezium/data/EnumSet.java @@ -10,6 +10,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.schema.SchemaFactory; import io.debezium.util.Strings; /** @@ -19,8 +20,10 @@ */ public class EnumSet { - public static final String LOGICAL_NAME = "io.debezium.data.EnumSet"; - public static final String VALUES_FIELD = "allowed"; + public static final String ENUM_SET_SCHEMA_NAME = "io.debezium.data.EnumSet"; + public static final String ENUM_SET_VALUES_FIELD = "allowed"; + + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); /** * Returns a {@link SchemaBuilder} for a set of enumerated values. You can use the resulting SchemaBuilder @@ -30,10 +33,7 @@ public class EnumSet { * @return the schema builder */ public static SchemaBuilder builder(String allowedValues) { - return SchemaBuilder.string() - .name(LOGICAL_NAME) - .parameter(VALUES_FIELD, allowedValues) - .version(1); + return schemaFactoryObject.datatypeEnumSetSchema(allowedValues); } /** diff --git a/debezium-core/src/main/java/io/debezium/data/Json.java b/debezium-core/src/main/java/io/debezium/data/Json.java index 83ef69075..134f8425a 100644 --- a/debezium-core/src/main/java/io/debezium/data/Json.java +++ b/debezium-core/src/main/java/io/debezium/data/Json.java @@ -8,6 +8,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.schema.SchemaFactory; + /** * A semantic type for a JSON string. * @@ -15,7 +17,9 @@ */ public class Json { - public static final String LOGICAL_NAME = "io.debezium.data.Json"; + public static final String JSON_SCHEMA_NAME = "io.debezium.data.Json"; + + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); /** * Returns a {@link SchemaBuilder} for a JSON field. You can use the resulting SchemaBuilder @@ -24,9 +28,7 @@ public class Json { * @return the schema builder */ public static SchemaBuilder builder() { - return SchemaBuilder.string() - .name(LOGICAL_NAME) - .version(1); + return schemaFactoryObject.datatypeJsonSchema(); } /** diff --git a/debezium-core/src/main/java/io/debezium/data/Uuid.java b/debezium-core/src/main/java/io/debezium/data/Uuid.java index e90fb886d..0cc1618f3 100644 --- a/debezium-core/src/main/java/io/debezium/data/Uuid.java +++ b/debezium-core/src/main/java/io/debezium/data/Uuid.java @@ -9,6 +9,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.schema.SchemaFactory; + /** * A semantic type for a Uuid string. * @@ -16,7 +18,9 @@ */ public class Uuid { - public static final String LOGICAL_NAME = "io.debezium.data.Uuid"; + public static final String UUID_SCHEMA_NAME = "io.debezium.data.Uuid"; + + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); /** * Returns a {@link SchemaBuilder} for a Uuid field. You can use the resulting SchemaBuilder @@ -25,9 +29,7 @@ public class Uuid { * @return the schema builder */ public static SchemaBuilder builder() { - return SchemaBuilder.string() - .name(LOGICAL_NAME) - .version(1); + return schemaFactoryObject.datatypeUuidSchema(); } /** diff --git a/debezium-core/src/main/java/io/debezium/data/VariableScaleDecimal.java b/debezium-core/src/main/java/io/debezium/data/VariableScaleDecimal.java index 9833708a8..6887f7be0 100644 --- a/debezium-core/src/main/java/io/debezium/data/VariableScaleDecimal.java +++ b/debezium-core/src/main/java/io/debezium/data/VariableScaleDecimal.java @@ -14,6 +14,8 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import io.debezium.schema.SchemaFactory; + /** * An arbitrary precision decimal value with variable scale. * @@ -21,11 +23,13 @@ * */ public class VariableScaleDecimal { - public static final String LOGICAL_NAME = "io.debezium.data.VariableScaleDecimal"; - public static final String VALUE_FIELD = "value"; - public static final String SCALE_FIELD = "scale"; + public static final String VARIABLE_SCALE_DECIMAL_SCHEMA_NAME = "io.debezium.data.VariableScaleDecimal"; + public static final String VARIABLE_SCALE_DECIMAL_VALUE_FIELD = "value"; + public static final String VARIABLE_SCALE_DECIMAL_SCALE_FIELD = "scale"; public static final Struct ZERO = fromLogical(schema(), SpecialValueDecimal.ZERO); + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); + /** * Returns a {@link SchemaBuilder} for a VariableScaleDecimal. You can use the resulting SchemaBuilder * to set additional schema settings such as required/optional, default value, and documentation. @@ -33,12 +37,7 @@ public class VariableScaleDecimal { * @return the schema builder */ public static SchemaBuilder builder() { - return SchemaBuilder.struct() - .name(LOGICAL_NAME) - .version(1) - .doc("Variable scaled decimal") - .field(SCALE_FIELD, Schema.INT32_SCHEMA) - .field(VALUE_FIELD, Schema.BYTES_SCHEMA); + return schemaFactoryObject.datatypeVariableScaleDecimalSchema(); } /** @@ -66,7 +65,7 @@ public static Schema optionalSchema() { * the scale of the number and a binary representation of the number. * * @param schema of the encoded value - * @param value the value or the decimal + * @param decimalValue the value or the decimal * * @return the encoded value */ @@ -87,8 +86,8 @@ public static Struct fromLogical(Schema schema, BigDecimal decimalValue) { Objects.requireNonNull(decimalValue, "decimalValue may not be null"); Struct result = new Struct(schema); - result.put(VALUE_FIELD, decimalValue.unscaledValue().toByteArray()); - result.put(SCALE_FIELD, decimalValue.scale()); + result.put(VARIABLE_SCALE_DECIMAL_VALUE_FIELD, decimalValue.unscaledValue().toByteArray()); + result.put(VARIABLE_SCALE_DECIMAL_SCALE_FIELD, decimalValue.scale()); return result; } @@ -100,6 +99,7 @@ public static Struct fromLogical(Schema schema, BigDecimal decimalValue) { * @return the decoded value */ public static SpecialValueDecimal toLogical(final Struct value) { - return new SpecialValueDecimal(new BigDecimal(new BigInteger(value.getBytes(VALUE_FIELD)), value.getInt32(SCALE_FIELD))); + return new SpecialValueDecimal( + new BigDecimal(new BigInteger(value.getBytes(VARIABLE_SCALE_DECIMAL_VALUE_FIELD)), value.getInt32(VARIABLE_SCALE_DECIMAL_SCALE_FIELD))); } } diff --git a/debezium-core/src/main/java/io/debezium/data/Xml.java b/debezium-core/src/main/java/io/debezium/data/Xml.java index 398c69027..56259b393 100644 --- a/debezium-core/src/main/java/io/debezium/data/Xml.java +++ b/debezium-core/src/main/java/io/debezium/data/Xml.java @@ -9,6 +9,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.schema.SchemaFactory; + /** * A semantic type for an XML string. * @@ -16,7 +18,9 @@ */ public class Xml { - public static final String LOGICAL_NAME = "io.debezium.data.Xml"; + public static final String XML_SCHEMA_NAME = "io.debezium.data.Xml"; + + private static final SchemaFactory schemaFactoryObject = SchemaFactory.get(); /** * Returns a {@link SchemaBuilder} for an XML field. You can use the resulting SchemaBuilder @@ -25,9 +29,7 @@ public class Xml { * @return the schema builder */ public static SchemaBuilder builder() { - return SchemaBuilder.string() - .name(LOGICAL_NAME) - .version(1); + return schemaFactoryObject.datatypeXmlSchema(); } /** diff --git a/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java b/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java index 31fe8c1b8..9e6261ee4 100644 --- a/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java +++ b/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java @@ -10,6 +10,8 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.AbstractSourceInfo; +import io.debezium.data.*; +import io.debezium.data.Enum; import io.debezium.heartbeat.HeartbeatImpl; import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.relational.history.ConnectTableChangeSerializer; @@ -68,6 +70,45 @@ public class SchemaFactory { private static final String SCHEMA_HISTORY_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change"; private static final int SCHEMA_HISTORY_CHANGE_SCHEMA_VERSION = 1; + /* + * Datatype schemas + */ + + /* + * Bits schema + */ + private static final int BITS_SCHEMA_VERSION = 1; + + /* + * Enum schema + */ + private static final int ENUM_SCHEMA_VERSION = 1; + + /* + * EnumSet schema + */ + private static final int ENUM_SET_SCHEMA_VERSION = 1; + + /* + * Json schema + */ + private static final int JSON_SCHEMA_VERSION = 1; + + /* + * Uuid schema + */ + private static final int UUID_SCHEMA_VERSION = 1; + + /* + * Variable Scale Decimal schema + */ + private static final int VARIABLE_SCALE_DECIMAL_SCHEMA_VERSION = 1; + + /* + * Xml schema + */ + private static final int XML_SCHEMA_VERSION = 1; + private static final SchemaFactory schemaFactoryObject = new SchemaFactory(); private SchemaFactory() { @@ -196,4 +237,52 @@ public Schema schemaHistoryConnectorValueSchema(SchemaNameAdjuster adjuster, Com .field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(serializer.getChangeSchema()).build()) .build(); } + + public SchemaBuilder datatypeBitsSchema(int length) { + return SchemaBuilder.bytes() + .name(Bits.BITS_SCHEMA_NAME) + .version(BITS_SCHEMA_VERSION) + .parameter(Bits.BITS_LENGTH_FIELD, Integer.toString(length)); + } + + public SchemaBuilder datatypeEnumSchema(String allowedValues) { + return SchemaBuilder.string() + .name(Enum.ENUM_SCHEMA_NAME) + .version(ENUM_SCHEMA_VERSION) + .parameter(Enum.ENUM_VALUES_FIELD, allowedValues); + } + + public SchemaBuilder datatypeEnumSetSchema(String allowedValues) { + return SchemaBuilder.string() + .name(EnumSet.ENUM_SET_SCHEMA_NAME) + .version(ENUM_SET_SCHEMA_VERSION) + .parameter(EnumSet.ENUM_SET_VALUES_FIELD, allowedValues); + } + + public SchemaBuilder datatypeJsonSchema() { + return SchemaBuilder.string() + .name(Json.JSON_SCHEMA_NAME) + .version(JSON_SCHEMA_VERSION); + } + + public SchemaBuilder datatypeUuidSchema() { + return SchemaBuilder.string() + .name(Uuid.UUID_SCHEMA_NAME) + .version(UUID_SCHEMA_VERSION); + } + + public SchemaBuilder datatypeVariableScaleDecimalSchema() { + return SchemaBuilder.struct() + .name(VariableScaleDecimal.VARIABLE_SCALE_DECIMAL_SCHEMA_NAME) + .version(VARIABLE_SCALE_DECIMAL_SCHEMA_VERSION) + .doc("Variable scaled decimal") + .field(VariableScaleDecimal.VARIABLE_SCALE_DECIMAL_SCALE_FIELD, Schema.INT32_SCHEMA) + .field(VariableScaleDecimal.VARIABLE_SCALE_DECIMAL_VALUE_FIELD, Schema.BYTES_SCHEMA); + } + + public SchemaBuilder datatypeXmlSchema() { + return SchemaBuilder.string() + .name(Xml.XML_SCHEMA_NAME) + .version(XML_SCHEMA_VERSION); + } } diff --git a/debezium-core/src/test/java/io/debezium/data/EnumSetTest.java b/debezium-core/src/test/java/io/debezium/data/EnumSetTest.java index 0a62f1700..bd6288e16 100644 --- a/debezium-core/src/test/java/io/debezium/data/EnumSetTest.java +++ b/debezium-core/src/test/java/io/debezium/data/EnumSetTest.java @@ -35,12 +35,12 @@ public void shouldCreateSchemaFromValues() { private void assertBuilder(SchemaBuilder builder, String expectedAllowedValues) { assertThat(builder).isNotNull(); assertThat(builder.parameters()).isNotNull(); - assertThat(builder.parameters().get(EnumSet.VALUES_FIELD)).isEqualTo(expectedAllowedValues); + assertThat(builder.parameters().get(EnumSet.ENUM_SET_VALUES_FIELD)).isEqualTo(expectedAllowedValues); } private void assertSchema(Schema schema, String expectedAllowedValues) { assertThat(schema).isNotNull(); assertThat(schema.parameters()).isNotNull(); - assertThat(schema.parameters().get(EnumSet.VALUES_FIELD)).isEqualTo(expectedAllowedValues); + assertThat(schema.parameters().get(EnumSet.ENUM_SET_VALUES_FIELD)).isEqualTo(expectedAllowedValues); } } \ No newline at end of file diff --git a/debezium-core/src/test/java/io/debezium/data/EnumTest.java b/debezium-core/src/test/java/io/debezium/data/EnumTest.java index 89337e9ec..f35eaa9d8 100644 --- a/debezium-core/src/test/java/io/debezium/data/EnumTest.java +++ b/debezium-core/src/test/java/io/debezium/data/EnumTest.java @@ -36,12 +36,12 @@ public void shouldCreateSchemaFromValues() { private void assertBuilder(SchemaBuilder builder, String expectedAllowedValues) { assertThat(builder).isNotNull(); assertThat(builder.parameters()).isNotNull(); - assertThat(builder.parameters().get(Enum.VALUES_FIELD)).isEqualTo(expectedAllowedValues); + assertThat(builder.parameters().get(Enum.ENUM_VALUES_FIELD)).isEqualTo(expectedAllowedValues); } private void assertSchema(Schema schema, String expectedAllowedValues) { assertThat(schema).isNotNull(); assertThat(schema.parameters()).isNotNull(); - assertThat(schema.parameters().get(Enum.VALUES_FIELD)).isEqualTo(expectedAllowedValues); + assertThat(schema.parameters().get(Enum.ENUM_VALUES_FIELD)).isEqualTo(expectedAllowedValues); } }