DBZ-1044 Sanitize column/topic names to always be valid Avro Names

This patch fixes the issue where more lenient standards for name conventions in source databases cause the
snapshot to fail when the schema for the table is created. This patch will prepend any column names that start
with a digit with an underscore, and will replace any subsequent invalid characters with an underscore. This should
conform to the Avro Name standard here: https://avro.apache.org/docs/1.7.7/spec.html#Names
This commit is contained in:
Josh Arenberg 2019-07-06 17:46:12 -04:00 committed by Jiri Pechanec
parent b569c184da
commit 6573738fe9
2 changed files with 49 additions and 2 deletions

View File

@ -366,7 +366,7 @@ protected void addField(SchemaBuilder builder, Column column, ColumnMapper mappe
fieldBuilder.defaultValue(column.defaultValue()); fieldBuilder.defaultValue(column.defaultValue());
} }
builder.field(column.name(), fieldBuilder.build()); builder.field(santizeColumnName(column), fieldBuilder.build());
if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
LOGGER.debug("- field '{}' ({}{}) from column {}", column.name(), builder.isOptional() ? "OPTIONAL " : "", LOGGER.debug("- field '{}' ({}{}) from column {}", column.name(), builder.isOptional() ? "OPTIONAL " : "",
fieldBuilder.type(), fieldBuilder.type(),
@ -389,4 +389,34 @@ protected void addField(SchemaBuilder builder, Column column, ColumnMapper mappe
protected ValueConverter createValueConverterFor(Column column, Field fieldDefn) { protected ValueConverter createValueConverterFor(Column column, Field fieldDefn) {
return valueConverterProvider.converter(column, fieldDefn); return valueConverterProvider.converter(column, fieldDefn);
} }
/**
* Sanitize column names that are illegal in Avro
* Must conform to https://avro.apache.org/docs/1.7.7/spec.html#Names
* Legal characters are [a-zA-Z_] for the first character and [a-zA-Z0-9_] thereafter.
*
* @param column the column object containing the name to be sanitized
*
* @return the sanitized name.
*/
protected String santizeColumnName(Column column) {
Character replacementCharacter = '_';
Character numberPrefix = '_';
StringBuilder sanitizedNameBuilder = new StringBuilder(column.name().length() + 1);
for(int i = 0; i < column.name().length(); i++) {
char c = column.name().charAt(i);
if ( i == 0 && Character.isDigit(c)) {
sanitizedNameBuilder.append(numberPrefix);
sanitizedNameBuilder.append(c);
}
else if ( ! ( c == '_' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') ) ) {
sanitizedNameBuilder.append(replacementCharacter);
}
else {
sanitizedNameBuilder.append(c);
}
}
return sanitizedNameBuilder.toString();
}
} }

View File

@ -28,7 +28,7 @@ public class TableSchemaBuilderTest {
private final String prefix = ""; private final String prefix = "";
private final TableId id = new TableId("catalog", "schema", "table"); private final TableId id = new TableId("catalog", "schema", "table");
private final Object[] data = new Object[] { "c1value", 3.142d, java.sql.Date.valueOf("2001-10-31"), 4, new byte[]{ 71, 117, 110, 110, 97, 114}, null }; private final Object[] data = new Object[] { "c1value", 3.142d, java.sql.Date.valueOf("2001-10-31"), 4, new byte[]{ 71, 117, 110, 110, 97, 114}, null, "c7value", "c8value" };
private Table table; private Table table;
private Column c1; private Column c1;
private Column c2; private Column c2;
@ -36,6 +36,9 @@ public class TableSchemaBuilderTest {
private Column c4; private Column c4;
private Column c5; private Column c5;
private Column c6; private Column c6;
private Column c7;
private Column c8;
private TableSchema schema; private TableSchema schema;
private SchemaNameAdjuster adjuster; private SchemaNameAdjuster adjuster;
@ -73,6 +76,14 @@ public void beforeEach() {
.type("SMALLINT").jdbcType(Types.SMALLINT) .type("SMALLINT").jdbcType(Types.SMALLINT)
.optional(false) .optional(false)
.length(1) .length(1)
.create(),
Column.editor().name("7C7") // test invalid Avro name (starts with digit)
.type("VARCHAR").jdbcType(Types.VARCHAR).length(10)
.optional(false)
.create(),
Column.editor().name("C-8") // test invalid Avro name ( contains dash )
.type("VARCHAR").jdbcType(Types.VARCHAR).length(10)
.optional(false)
.create()) .create())
.setPrimaryKeyNames("C1", "C2") .setPrimaryKeyNames("C1", "C2")
.create(); .create();
@ -82,6 +93,8 @@ public void beforeEach() {
c4 = table.columnWithName("C4"); c4 = table.columnWithName("C4");
c5 = table.columnWithName("C5"); c5 = table.columnWithName("C5");
c6 = table.columnWithName("C6"); c6 = table.columnWithName("C6");
c7 = table.columnWithName("7C7");
c8 = table.columnWithName("C-8");
} }
@Test @Test
@ -92,6 +105,8 @@ public void checkPreconditions() {
assertThat(c4).isNotNull(); assertThat(c4).isNotNull();
assertThat(c5).isNotNull(); assertThat(c5).isNotNull();
assertThat(c6).isNotNull(); assertThat(c6).isNotNull();
assertThat(c7).isNotNull();
assertThat(c8).isNotNull();
} }
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
@ -182,6 +197,8 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
assertThat(values.field("C5").schema()).isEqualTo(SchemaBuilder.bytes().build()); // JDBC BINARY = bytes assertThat(values.field("C5").schema()).isEqualTo(SchemaBuilder.bytes().build()); // JDBC BINARY = bytes
assertThat(values.field("C6").index()).isEqualTo(5); assertThat(values.field("C6").index()).isEqualTo(5);
assertThat(values.field("C6").schema()).isEqualTo(SchemaBuilder.int16().build()); assertThat(values.field("C6").schema()).isEqualTo(SchemaBuilder.int16().build());
assertThat(values.field("_7C7").index()).isEqualTo(6); // Column starting with digit is prefixed with _
assertThat(values.field("C_8").index()).isEqualTo(7); // Column C-8 has - replaced with _
Struct value = schema.valueFromColumnData(data); Struct value = schema.valueFromColumnData(data);
assertThat(value).isNotNull(); assertThat(value).isNotNull();