diff --git a/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java b/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java index 5e8b1d11e..77bf4bfd2 100644 --- a/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java +++ b/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java @@ -37,6 +37,19 @@ public LogInterceptor() { } } + public LogInterceptor(Class clazz) { + try { + final Field field = Log4jLoggerAdapter.class.getDeclaredField("logger"); + field.setAccessible(true); + + Logger logger = (Logger) field.get(LoggerFactory.getLogger(clazz)); + logger.addAppender(this); + } + catch (Exception e) { + throw new RuntimeException("Failed to obtain Log4j logger for log interceptor."); + } + } + @Override protected void append(LoggingEvent loggingEvent) { this.events.add(loggingEvent); diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java index 24679551f..b0998b22e 100644 --- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java +++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java @@ -25,6 +25,7 @@ import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.junit.logging.LogInterceptor; import io.debezium.junit.relational.TestRelationalDatabaseConfig; import io.debezium.relational.Key.CustomKeyMapper; import io.debezium.relational.Key.KeyMapper; @@ -40,7 +41,7 @@ public class TableSchemaBuilderTest { private final String prefix = ""; private final TableId id = new TableId("catalog", "schema", "table"); private final Object[] data = new Object[]{ "c1value", 3.142d, java.sql.Date.valueOf("2001-10-31"), 4, new byte[]{ 71, 117, 110, 110, 97, 114 }, null, "c7value", - "c8value", "c9value" }; + "c8value", "c9value", null }; private final Object[] keyData = new Object[]{ "c1value", 3.142d }; private Table table; private Column c1; @@ -52,6 +53,7 @@ public class TableSchemaBuilderTest { private Column c7; private Column c8; private Column c9; + private Column c10; private TableSchema schema; private SchemaNameAdjuster adjuster; @@ -103,6 +105,10 @@ public void beforeEach() { Column.editor().name(AVRO_UNSUPPORTED_NAME) .type("VARCHAR").jdbcType(Types.VARCHAR).length(10) .optional(false) + .create(), + Column.editor().name("UP$ID") + .type("NUMBER").jdbcType(Types.INTEGER) + .optional(false) .create()) .setPrimaryKeyNames("C1", "C2") .create(); @@ -115,6 +121,7 @@ public void beforeEach() { c7 = table.columnWithName("7C7"); c8 = table.columnWithName("C-8"); c9 = table.columnWithName(AVRO_UNSUPPORTED_NAME); + c10 = table.columnWithName("UP$ID"); } @Test @@ -128,6 +135,7 @@ public void checkPreconditions() { assertThat(c7).isNotNull(); assertThat(c8).isNotNull(); assertThat(c9).isNotNull(); + assertThat(c10).isNotNull(); } @Test(expected = NullPointerException.class) @@ -234,6 +242,11 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() { assertThat(values.field(AVRO_UNSUPPORTED_NAME).index()).isEqualTo(8); assertThat(values.field(AVRO_UNSUPPORTED_NAME).schema()).isEqualTo(SchemaBuilder.string().build()); + // Column UP$ID contains an invalid character, left as-is + assertThat(values.field("UP$ID").name()).isEqualTo("UP$ID"); + assertThat(values.field("UP$ID").index()).isEqualTo(9); + assertThat(values.field("UP$ID").schema()).isEqualTo(SchemaBuilder.int32().build()); + Struct value = schema.valueFromColumnData(data); assertThat(value).isNotNull(); assertThat(value.get("C1")).isEqualTo("c1value"); @@ -295,6 +308,11 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe assertThat(values.field(AVRO_UNSUPPORTED_NAME_CONVERTED).index()).isEqualTo(8); assertThat(values.field(AVRO_UNSUPPORTED_NAME_CONVERTED).schema()).isEqualTo(SchemaBuilder.string().build()); + // Column UP$ID should has $ converted to underscore + assertThat(values.field("UP_ID").name()).isEqualTo("UP_ID"); + assertThat(values.field("UP_ID").index()).isEqualTo(9); + assertThat(values.field("UP_ID").schema()).isEqualTo(SchemaBuilder.int32().build()); + Struct value = schema.valueFromColumnData(data); assertThat(value).isNotNull(); assertThat(value.get("C1")).isEqualTo("c1value"); @@ -306,8 +324,10 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe } @Test - @FixFor("DBZ-1044") + @FixFor({ "DBZ-1044", "DBZ-2849" }) public void shouldSanitizeFieldNamesAndValidateSerialization() { + LogInterceptor logInterceptor = new LogInterceptor(TableSchemaBuilder.class); + schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), true) .create(prefix, "sometopic", table, null, null, null); @@ -316,6 +336,10 @@ public void shouldSanitizeFieldNamesAndValidateSerialization() { SourceRecord record = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "sometopic", schema.keySchema(), key, schema.valueSchema(), value); VerifyRecord.isValid(record); + + assertThat(logInterceptor.containsErrorMessage("Failed to properly convert data value for 'catalog.schema.table.UP$ID' of type NUMBER")) + .describedAs("Expected no value conversion failures") + .isFalse(); } @Test