DBZ-7797 Fail fast if surrogate key for incremental snapshot doesn't exist

This commit is contained in:
Jochen Schalanda 2024-04-19 16:00:14 +02:00 committed by Jiri Pechanec
parent 478c95f042
commit 5fc9bb7f13
2 changed files with 24 additions and 1 deletions

View File

@ -239,7 +239,11 @@ public List<Column> getQueryColumns(IncrementalSnapshotContext<T> context, Table
if (context != null && context.currentDataCollectionId() != null) {
Optional<String> surrogateKey = context.currentDataCollectionId().getSurrogateKey();
if (surrogateKey.isPresent()) {
return Collections.singletonList(table.columnWithName(surrogateKey.get()));
Column column = table.columnWithName(surrogateKey.get());
if (column == null) {
throw new IllegalArgumentException("Surrogate key \"" + surrogateKey.get() + "\" doesn't exist in table \"" + table.id() + "\"");
}
return Collections.singletonList(column);
}
}
return getKeyMapper().getKeyKolumns(table);

View File

@ -448,6 +448,25 @@ public void testBuildQueryThreePkColumnsAndMessageKeyColumnsOrder() {
"SELECT * FROM \"s1\".\"table1\" WHERE ((\"pk2\" > ?) OR (\"pk2\" = ? AND \"pk1\" > ?) OR (\"pk2\" = ? AND \"pk1\" = ? AND \"pk3\" > ?)) AND NOT ((\"pk2\" > ?) OR (\"pk2\" = ? AND \"pk1\" > ?) OR (\"pk2\" = ? AND \"pk1\" = ? AND \"pk3\" > ?)) ORDER BY \"pk2\", \"pk1\", \"pk3\" LIMIT 1024");
}
@Test
public void testQueryBuilderThrowsIllegalArgumentExceptionIfSurrogateKeyDoesNotExist() {
final ChunkQueryBuilder<TableId> chunkQueryBuilder = new DefaultChunkQueryBuilder<>(
config(), new JdbcConnection(config().getJdbcConfig(), config -> null, "\"", "\""));
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
final Column pk1 = Column.editor().name("pk1").optional(false).create();
final Column val1 = Column.editor().name("val1").create();
final Column val2 = Column.editor().name("val2").create();
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1"))
.addColumn(pk1)
.addColumn(val1)
.addColumn(val2)
.setPrimaryKeyNames("pk1").create();
context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), List.of(), "sk");
assertThatThrownBy(() -> chunkQueryBuilder.buildChunkQuery(context, table, Optional.of("\"val1\"=foo")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Surrogate key \"sk\" doesn't exist in table \"s1.table1\"");
}
private Table createTwoPrimaryKeysTable() {
final Column pk1 = Column.editor().name("pk1").optional(false).create();
final Column pk2 = Column.editor().name("pk2").optional(false).create();