DBZ-2849 Added test
This commit is contained in:
parent
8409282739
commit
d32e4b3720
@ -37,6 +37,19 @@ public LogInterceptor() {
|
||||
}
|
||||
}
|
||||
|
||||
public LogInterceptor(Class<?> clazz) {
|
||||
try {
|
||||
final Field field = Log4jLoggerAdapter.class.getDeclaredField("logger");
|
||||
field.setAccessible(true);
|
||||
|
||||
Logger logger = (Logger) field.get(LoggerFactory.getLogger(clazz));
|
||||
logger.addAppender(this);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to obtain Log4j logger for log interceptor.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void append(LoggingEvent loggingEvent) {
|
||||
this.events.add(loggingEvent);
|
||||
|
@ -25,6 +25,7 @@
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.jdbc.JdbcValueConverters;
|
||||
import io.debezium.junit.logging.LogInterceptor;
|
||||
import io.debezium.junit.relational.TestRelationalDatabaseConfig;
|
||||
import io.debezium.relational.Key.CustomKeyMapper;
|
||||
import io.debezium.relational.Key.KeyMapper;
|
||||
@ -40,7 +41,7 @@ public class TableSchemaBuilderTest {
|
||||
private final String prefix = "";
|
||||
private final TableId id = new TableId("catalog", "schema", "table");
|
||||
private final Object[] data = new Object[]{ "c1value", 3.142d, java.sql.Date.valueOf("2001-10-31"), 4, new byte[]{ 71, 117, 110, 110, 97, 114 }, null, "c7value",
|
||||
"c8value", "c9value" };
|
||||
"c8value", "c9value", null };
|
||||
private final Object[] keyData = new Object[]{ "c1value", 3.142d };
|
||||
private Table table;
|
||||
private Column c1;
|
||||
@ -52,6 +53,7 @@ public class TableSchemaBuilderTest {
|
||||
private Column c7;
|
||||
private Column c8;
|
||||
private Column c9;
|
||||
private Column c10;
|
||||
|
||||
private TableSchema schema;
|
||||
private SchemaNameAdjuster adjuster;
|
||||
@ -103,6 +105,10 @@ public void beforeEach() {
|
||||
Column.editor().name(AVRO_UNSUPPORTED_NAME)
|
||||
.type("VARCHAR").jdbcType(Types.VARCHAR).length(10)
|
||||
.optional(false)
|
||||
.create(),
|
||||
Column.editor().name("UP$ID")
|
||||
.type("NUMBER").jdbcType(Types.INTEGER)
|
||||
.optional(false)
|
||||
.create())
|
||||
.setPrimaryKeyNames("C1", "C2")
|
||||
.create();
|
||||
@ -115,6 +121,7 @@ public void beforeEach() {
|
||||
c7 = table.columnWithName("7C7");
|
||||
c8 = table.columnWithName("C-8");
|
||||
c9 = table.columnWithName(AVRO_UNSUPPORTED_NAME);
|
||||
c10 = table.columnWithName("UP$ID");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -128,6 +135,7 @@ public void checkPreconditions() {
|
||||
assertThat(c7).isNotNull();
|
||||
assertThat(c8).isNotNull();
|
||||
assertThat(c9).isNotNull();
|
||||
assertThat(c10).isNotNull();
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
@ -234,6 +242,11 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
|
||||
assertThat(values.field(AVRO_UNSUPPORTED_NAME).index()).isEqualTo(8);
|
||||
assertThat(values.field(AVRO_UNSUPPORTED_NAME).schema()).isEqualTo(SchemaBuilder.string().build());
|
||||
|
||||
// Column UP$ID contains an invalid character, left as-is
|
||||
assertThat(values.field("UP$ID").name()).isEqualTo("UP$ID");
|
||||
assertThat(values.field("UP$ID").index()).isEqualTo(9);
|
||||
assertThat(values.field("UP$ID").schema()).isEqualTo(SchemaBuilder.int32().build());
|
||||
|
||||
Struct value = schema.valueFromColumnData(data);
|
||||
assertThat(value).isNotNull();
|
||||
assertThat(value.get("C1")).isEqualTo("c1value");
|
||||
@ -295,6 +308,11 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe
|
||||
assertThat(values.field(AVRO_UNSUPPORTED_NAME_CONVERTED).index()).isEqualTo(8);
|
||||
assertThat(values.field(AVRO_UNSUPPORTED_NAME_CONVERTED).schema()).isEqualTo(SchemaBuilder.string().build());
|
||||
|
||||
// Column UP$ID should has $ converted to underscore
|
||||
assertThat(values.field("UP_ID").name()).isEqualTo("UP_ID");
|
||||
assertThat(values.field("UP_ID").index()).isEqualTo(9);
|
||||
assertThat(values.field("UP_ID").schema()).isEqualTo(SchemaBuilder.int32().build());
|
||||
|
||||
Struct value = schema.valueFromColumnData(data);
|
||||
assertThat(value).isNotNull();
|
||||
assertThat(value.get("C1")).isEqualTo("c1value");
|
||||
@ -306,8 +324,10 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1044")
|
||||
@FixFor({ "DBZ-1044", "DBZ-2849" })
|
||||
public void shouldSanitizeFieldNamesAndValidateSerialization() {
|
||||
LogInterceptor logInterceptor = new LogInterceptor(TableSchemaBuilder.class);
|
||||
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), true)
|
||||
.create(prefix, "sometopic", table, null, null, null);
|
||||
|
||||
@ -316,6 +336,10 @@ public void shouldSanitizeFieldNamesAndValidateSerialization() {
|
||||
|
||||
SourceRecord record = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "sometopic", schema.keySchema(), key, schema.valueSchema(), value);
|
||||
VerifyRecord.isValid(record);
|
||||
|
||||
assertThat(logInterceptor.containsErrorMessage("Failed to properly convert data value for 'catalog.schema.table.UP$ID' of type NUMBER"))
|
||||
.describedAs("Expected no value conversion failures")
|
||||
.isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user