DBZ-644 Misc. clean-up and tests;

* Using parameter keys under "__debezium" namespace
* More expressive names
* Adding tests
This commit is contained in:
Gunnar Morling 2018-06-28 16:57:19 +02:00 committed by Jiri Pechanec
parent 8744488c8a
commit 068aa85bd6
7 changed files with 234 additions and 63 deletions

View File

@ -0,0 +1,139 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
/**
* Tests around {@code DECIMAL} columns. Keep in sync with {@link MySqlNumericColumnIT}.
*
* @author Gunnar Morling
*/
public class MySqlSourceTypeInSchemaIT extends AbstractConnectorTest {
private static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
private static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-schema-parameter.txt")
.toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("schemaparameterit", "source_type_as_schema_parameter_test")
.withDbHistoryPath(DB_HISTORY_PATH);
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
}
finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
@FixFor("DBZ-644")
public void shouldPropagateSourceTypeAsSchemaParameter() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.with("column.propagate.source.type", ".*c1,.*c2,.*c3.*")
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
//Testing.Debug.enable();
int numCreateDatabase = 1;
int numCreateTables = 1;
int numInserts = 1;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numInserts);
stopConnector();
assertThat(records).isNotNull();
records.forEach(this::validate);
List<SourceRecord> dmls = records.recordsForTopic(DATABASE.topicForTable("dbz_644_source_type_mapped_as_schema_parameter_test"));
assertThat(dmls).hasSize(1);
SourceRecord insert = dmls.get(0);
Field before = insert.valueSchema().field("before");
// no type info requested as per given regexps
Map<String, String> idSchemaParameters = before
.schema()
.field("id")
.schema()
.parameters();
assertThat(idSchemaParameters).isNull();
// fixed width, name but no length info
Map<String, String> c1SchemaParameters = before
.schema()
.field("c1")
.schema()
.parameters();
assertThat(c1SchemaParameters).includes(entry(TYPE_NAME_PARAMETER_KEY, "INT"));
// fixed width, name but no length info
Map<String, String> c2SchemaParameters = before
.schema()
.field("c2")
.schema()
.parameters();
assertThat(c2SchemaParameters).includes(entry(TYPE_NAME_PARAMETER_KEY, "MEDIUMINT"));
// variable width, name and length info
Map<String, String> c3aSchemaParameters = before
.schema()
.field("c3a")
.schema()
.parameters();
assertThat(c3aSchemaParameters).includes(
entry(TYPE_NAME_PARAMETER_KEY, "NUMERIC"), entry(TYPE_LENGTH_PARAMETER_KEY, "5"));
// variable width, name and length info
Map<String, String> c3bSchemaParameters = before
.schema()
.field("c3b")
.schema()
.parameters();
assertThat(c3bSchemaParameters).includes(
entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR"), entry(TYPE_LENGTH_PARAMETER_KEY, "128"));
}
}

View File

@ -0,0 +1,14 @@
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: source_type_as_schema_parameter_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE TABLE dbz_644_source_type_mapped_as_schema_parameter_test (
id INT AUTO_INCREMENT NOT NULL,
c1 INT,
c2 MEDIUMINT,
c3a NUMERIC(5,2),
c3b VARCHAR(128),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
INSERT INTO dbz_644_source_type_mapped_as_schema_parameter_test VALUES (default, 123, 456, 789.01, 'test');

View File

@ -1,23 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.mapping;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import org.apache.kafka.connect.data.SchemaBuilder;
public class AddOriginalDataType implements ColumnMapper {
@Override
public ValueConverter create(Column column) {
return null;
}
@Override
public void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) {
schemaBuilder.parameter("originalType", String.valueOf(column.jdbcType()));
schemaBuilder.parameter("columnSize", String.valueOf(column.length()));
}
}

View File

@ -49,7 +49,7 @@ public static ColumnMappers create(Configuration config) {
// Define the truncated, masked, and mapped columns ...
config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", builder::truncateStrings);
config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", builder::maskStrings);
config.forEachMatchingFieldName("column\\.add\\.original\\.type", builder::addOriginalType);
config.forEachMatchingFieldName("column\\.propagate\\.source\\.type", builder::propagateSourceTypeToSchemaParameter);
return builder.build();
}
@ -154,8 +154,8 @@ public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) {
return map(fullyQualifiedColumnNames, new MaskStrings(maskValue));
}
public Builder addOriginalType(String fullyQualifiedColumnNames, String value) {
return map(value, new AddOriginalDataType());
public Builder propagateSourceTypeToSchemaParameter(String fullyQualifiedColumnNames, String value) {
return map(value, new PropagateSourceTypeToSchemaParameter());
}
/**

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.mapping;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
/**
* A column mapper that adds the {@link #TYPE_NAME_PARAMETER_KEY} and {@link #TYPE_LENGTH_PARAMETER_KEY} schema
* parameter keys.
*
* @author Orr Ganani
* @author Gunnar Morling
*/
public class PropagateSourceTypeToSchemaParameter implements ColumnMapper {
private static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
private static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
@Override
public ValueConverter create(Column column) {
return null;
}
@Override
public void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) {
schemaBuilder.parameter(TYPE_NAME_PARAMETER_KEY, column.typeName());
if (column.length() != Column.UNSET_INT_VALUE) {
schemaBuilder.parameter(TYPE_LENGTH_PARAMETER_KEY, String.valueOf(column.length()));
}
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.mapping;
import io.debezium.relational.Column;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Test;
import java.sql.Types;
import static org.fest.assertions.Assertions.assertThat;
/**
* @author Orr Ganani
*
*/
public class OriginalDataTypeTest {
private final int length = 255;
private final Column column = Column.editor()
.name("col")
.type("VARCHAR")
.jdbcType(Types.VARCHAR)
.length(length)
.create();
@Test
public void shouldTruncateStrings() {
SchemaBuilder schemaBuilder = SchemaBuilder.string();
new AddOriginalDataType().alterFieldSchema(column, schemaBuilder);
assertThat(schemaBuilder.parameters().get("originalType")).isEqualTo(String.valueOf(Types.VARCHAR));
assertThat(schemaBuilder.parameters().get("columnSize")).isEqualTo(String.valueOf(length));
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.mapping;
import static org.fest.assertions.Assertions.assertThat;
import java.sql.Types;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Test;
import io.debezium.doc.FixFor;
import io.debezium.relational.Column;
/**
* @author Orr Ganani
*/
public class PropagateSourceTypeToSchemaParameterTest {
@Test
@FixFor("DBZ-644")
public void shouldAddTypeInformation() {
int length = 255;
Column column = Column.editor()
.name("col")
.type("VARCHAR")
.jdbcType(Types.VARCHAR)
.length(length)
.create();
SchemaBuilder schemaBuilder = SchemaBuilder.string();
new PropagateSourceTypeToSchemaParameter().alterFieldSchema(column, schemaBuilder);
assertThat(schemaBuilder.parameters().get("__debezium.source.column.type")).isEqualTo("VARCHAR");
assertThat(schemaBuilder.parameters().get("__debezium.source.column.length")).isEqualTo(String.valueOf(length));
}
}