DBZ-1507 Ensure message keys have correct field order

This commit is contained in:
Chris Cranford 2019-09-30 13:39:03 -04:00 committed by Gunnar Morling
parent b8bbdb8755
commit d906096820
3 changed files with 29 additions and 7 deletions

View File

@ -80,12 +80,7 @@ public static interface KeyMapper {
private static class IdentityKeyMapper {
public static KeyMapper getInstance() {
return (table) -> {
return table.columns()
.stream()
.filter(c -> table.isPrimaryKeyColumn(c.name()))
.collect(Collectors.toList());
};
return (table) -> table.primaryKeyColumns();
}
}

View File

@ -43,7 +43,7 @@ public static TableEditor editor() {
* @return the list of columns that make up the primary key; never null but possibly empty
*/
default List<Column> primaryKeyColumns() {
return filterColumns(col->isPrimaryKeyColumn(col.name()));
return primaryKeyColumnNames().stream().map(this::columnWithName).collect(Collectors.toList());
}
/**

View File

@ -395,4 +395,31 @@ public void customKeyMapperShouldMapMultipleTables() {
assertThat(key2.fields()).hasSize(1);
assertThat(key2.field("C1").name()).isEqualTo("C1");
}
@Test
@FixFor("DBZ-1507")
public void defaultKeyMapperShouldOrderKeyColumnsBasedOnPrimaryKeyColumnNamesOrder() {
TableId id2 = new TableId("catalog", "schema", "info");
Table table2 = Table.editor()
.tableId(id2)
.addColumns(Column.editor().name("t1ID")
.type("INT").jdbcType(Types.INTEGER)
.optional(false)
.create(),
Column.editor().name("t2ID")
.type("INT").jdbcType(Types.INTEGER)
.optional(false)
.create())
.setPrimaryKeyNames("t2ID", "t1ID")
.create();
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, SchemaBuilder.struct().build(), false)
.create(prefix, "sometopic", table2, null, null, null);
Schema key2 = schema2.keySchema();
assertThat(key2).isNotNull();
assertThat(key2.fields()).hasSize(2);
assertThat(key2.fields().get(0).name()).isEqualTo("t2ID");
assertThat(key2.fields().get(1).name()).isEqualTo("t1ID");
}
}