From cedef89a48421ad1d26c87752ed1d7faed77adc9 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Fri, 18 Jan 2019 10:05:27 +0100 Subject: [PATCH] DBZ-1089 Using Strings#isNullOrEmpty(); adding test --- .../relational/TableSchemaBuilder.java | 18 +++++-- .../relational/TableSchemaBuilderTest.java | 48 +++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index 6b85803c2..a89df3a8a 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -27,6 +27,7 @@ import io.debezium.relational.mapping.ColumnMapper; import io.debezium.relational.mapping.ColumnMappers; import io.debezium.util.SchemaNameAdjuster; +import io.debezium.util.Strings; /** * Builder that constructs {@link TableSchema} instances for {@link Table} definitions. @@ -126,18 +127,25 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator); } + /** + * Returns the type schema name for the given table. + */ private String tableSchemaName(TableId tableId) { - if (tableId.catalog() == null || tableId.catalog().length() == 0) { - if (tableId.schema() == null || tableId.schema().length() == 0) { + if (Strings.isNullOrEmpty(tableId.catalog())) { + if (Strings.isNullOrEmpty(tableId.schema())) { return tableId.table(); } - return tableId.schema() + "." + tableId.table(); + else { + return tableId.schema() + "." + tableId.table(); + } } - if (tableId.schema() == null || tableId.schema().length() == 0) { + else if (Strings.isNullOrEmpty(tableId.schema())) { return tableId.catalog() + "." + tableId.table(); } // When both catalog and schema is present then only schema is used - return tableId.schema() + "." + tableId.table(); + else { + return tableId.schema() + "." + tableId.table(); + } } /** diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java index b2089ed51..6cdde9850 100644 --- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java +++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java @@ -19,6 +19,7 @@ import org.junit.Before; import org.junit.Test; +import io.debezium.doc.FixFor; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.time.Date; import io.debezium.util.SchemaNameAdjuster; @@ -106,6 +107,53 @@ public void shouldBuildTableSchemaFromTable() { assertThat(schema).isNotNull(); } + @Test + @FixFor("DBZ-1089") + public void shouldBuildCorrectSchemaNames() { + // table id with catalog and schema + schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build()) + .create(prefix, "sometopic", table, null, null); + assertThat(schema).isNotNull(); + assertThat(schema.keySchema().name()).isEqualTo("schema.table.Key"); + assertThat(schema.valueSchema().name()).isEqualTo("schema.table.Value"); + + // only catalog + table = table.edit() + .tableId(new TableId("testDb", null, "testTable")) + .create(); + + schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build()) + .create(prefix, "sometopic", table, null, null); + + assertThat(schema).isNotNull(); + assertThat(schema.keySchema().name()).isEqualTo("testDb.testTable.Key"); + assertThat(schema.valueSchema().name()).isEqualTo("testDb.testTable.Value"); + + // only schema + table = table.edit() + .tableId(new TableId(null, "testSchema", "testTable")) + .create(); + + schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build()) + .create(prefix, "sometopic", table, null, null); + + assertThat(schema).isNotNull(); + assertThat(schema.keySchema().name()).isEqualTo("testSchema.testTable.Key"); + assertThat(schema.valueSchema().name()).isEqualTo("testSchema.testTable.Value"); + + // neither catalog nor schema + table = table.edit() + .tableId(new TableId(null, null, "testTable")) + .create(); + + schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build()) + .create(prefix, "sometopic", table, null, null); + + assertThat(schema).isNotNull(); + assertThat(schema.keySchema().name()).isEqualTo("testTable.Key"); + assertThat(schema.valueSchema().name()).isEqualTo("testTable.Value"); + } + @Test public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() { table = table.edit().setPrimaryKeyNames().create();