DBZ-7956 Guard column lookup with failure if not found

This commit is contained in:
Chris Cranford 2024-06-18 13:15:55 -04:00 committed by Jiri Pechanec
parent cfc1237c3c
commit 0a70e5f66d
2 changed files with 12 additions and 4 deletions

View File

@ -672,9 +672,6 @@ protected String columnQueryBindingFromField(String fieldName, TableDescriptor t
final FieldDescriptor field = record.getFields().get(fieldName); final FieldDescriptor field = record.getFields().get(fieldName);
final String columnName = resolveColumnName(field); final String columnName = resolveColumnName(field);
final ColumnDescriptor column = table.getColumnByName(columnName); final ColumnDescriptor column = table.getColumnByName(columnName);
if (column == null) {
throw new DebeziumException("Failed to find column " + columnName + " in table " + table.getId().getTableName());
}
final Object value; final Object value;
if (record.getNonKeyFieldNames().contains(fieldName)) { if (record.getNonKeyFieldNames().contains(fieldName)) {

View File

@ -12,7 +12,9 @@
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable; import io.debezium.annotation.Immutable;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
/** /**
* Describes a relational table. * Describes a relational table.
@ -53,7 +55,16 @@ public Collection<ColumnDescriptor> getColumns() {
} }
public ColumnDescriptor getColumnByName(String columnName) { public ColumnDescriptor getColumnByName(String columnName) {
return columns.get(columnName); final ColumnDescriptor column = columns.get(columnName);
if (column == null) {
throw new DebeziumException(String.format(
"Failed to find column '%s' in table '%s'. " +
"If you have not enabled '%s', this could be related to column/field case differences.",
columnName,
id.getTableName(),
JdbcSinkConnectorConfig.QUOTE_IDENTIFIERS));
}
return column;
} }
public boolean hasColumn(String columnName) { public boolean hasColumn(String columnName) {