diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 97c21005a..ebd846e6d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -8,10 +8,12 @@ import java.nio.charset.Charset; import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.time.Duration; import java.util.Objects; import java.util.Optional; @@ -780,6 +782,20 @@ public Set getAllTableIds(String catalogName) throws SQLException { new String[]{ "TABLE", "PARTITIONED TABLE" }); } + @Override + public void setQueryColumnValue(PreparedStatement statement, Column column, int pos, Object value) + throws SQLException { + final PostgresType resolvedType = typeRegistry.get(column.nativeType()); + + if (resolvedType != null && resolvedType.isEnumType()) { + // ENUMs require explicit casting so the comparison operators can correctly work + statement.setObject(pos, value, Types.OTHER); + } + else { + super.setQueryColumnValue(statement, column, pos, value); + } + } + @FunctionalInterface public interface PostgresValueConverterBuilder { PostgresValueConverter build(TypeRegistry registry); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java index e9182fcd2..244f2804d 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java @@ -48,7 +48,9 @@ public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest queryColumns = getQueryColumns(currentTable); for (int i = 0; i < chunkEndPosition.length; i++) { for (int j = 0; j < i + 1; j++) { - statement.setObject(++pos, chunkEndPosition[j]); + jdbcConnection.setQueryColumnValue(statement, queryColumns.get(j), ++pos, chunkEndPosition[j]); } } // Fill maximum key placeholders for (int i = 0; i < chunkEndPosition.length; i++) { for (int j = 0; j < i + 1; j++) { - statement.setObject(++pos, maximumKey[j]); + jdbcConnection.setQueryColumnValue(statement, queryColumns.get(j), ++pos, maximumKey[j]); } } }