DBZ-1089 Using Strings#isNullOrEmpty(); adding test

This commit is contained in:
Gunnar Morling 2019-01-18 10:05:27 +01:00
parent 28c9734685
commit cedef89a48
2 changed files with 61 additions and 5 deletions

View File

@ -27,6 +27,7 @@
import io.debezium.relational.mapping.ColumnMapper;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
/**
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions.
@ -126,18 +127,25 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t
return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator);
}
/**
* Returns the type schema name for the given table.
*/
private String tableSchemaName(TableId tableId) {
if (tableId.catalog() == null || tableId.catalog().length() == 0) {
if (tableId.schema() == null || tableId.schema().length() == 0) {
if (Strings.isNullOrEmpty(tableId.catalog())) {
if (Strings.isNullOrEmpty(tableId.schema())) {
return tableId.table();
}
return tableId.schema() + "." + tableId.table();
else {
return tableId.schema() + "." + tableId.table();
}
}
if (tableId.schema() == null || tableId.schema().length() == 0) {
else if (Strings.isNullOrEmpty(tableId.schema())) {
return tableId.catalog() + "." + tableId.table();
}
// When both catalog and schema is present then only schema is used
return tableId.schema() + "." + tableId.table();
else {
return tableId.schema() + "." + tableId.table();
}
}
/**

View File

@ -19,6 +19,7 @@
import org.junit.Before;
import org.junit.Test;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.time.Date;
import io.debezium.util.SchemaNameAdjuster;
@ -106,6 +107,53 @@ public void shouldBuildTableSchemaFromTable() {
assertThat(schema).isNotNull();
}
@Test
@FixFor("DBZ-1089")
public void shouldBuildCorrectSchemaNames() {
// table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build())
.create(prefix, "sometopic", table, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("schema.table.Value");
// only catalog
table = table.edit()
.tableId(new TableId("testDb", null, "testTable"))
.create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build())
.create(prefix, "sometopic", table, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testDb.testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("testDb.testTable.Value");
// only schema
table = table.edit()
.tableId(new TableId(null, "testSchema", "testTable"))
.create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build())
.create(prefix, "sometopic", table, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testSchema.testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("testSchema.testTable.Value");
// neither catalog nor schema
table = table.edit()
.tableId(new TableId(null, null, "testTable"))
.create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build())
.create(prefix, "sometopic", table, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("testTable.Value");
}
@Test
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create();