DBZ-6481 Support Postgres enum as key in incremental snapshot

This commit is contained in:
Jiri Pechanec 2023-09-08 14:51:04 +02:00
parent 5538ce82fa
commit f1e7999431
4 changed files with 56 additions and 3 deletions

View File

@ -8,10 +8,12 @@
import java.nio.charset.Charset;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
@ -780,6 +782,20 @@ public Set<TableId> getAllTableIds(String catalogName) throws SQLException {
new String[]{ "TABLE", "PARTITIONED TABLE" });
}
@Override
public void setQueryColumnValue(PreparedStatement statement, Column column, int pos, Object value)
throws SQLException {
final PostgresType resolvedType = typeRegistry.get(column.nativeType());
if (resolvedType != null && resolvedType.isEnumType()) {
// ENUMs require explicit casting so the comparison operators can correctly work
statement.setObject(pos, value, Types.OTHER);
}
else {
super.setQueryColumnValue(statement, column, pos, value);
}
}
@FunctionalInterface
public interface PostgresValueConverterBuilder {
PostgresValueConverter build(TypeRegistry registry);

View File

@ -48,7 +48,9 @@ public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<Postg
+ "CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));"
+ "CREATE TABLE s1.a42 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer);"
+ "CREATE TABLE s1.anumeric (pk numeric, aa integer, PRIMARY KEY(pk));"
+ "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";
+ "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));"
+ "CREATE TYPE enum_type AS ENUM ('UP', 'DOWN', 'LEFT', 'RIGHT', 'STORY');"
+ "CREATE TABLE s1.enumpk (pk enum_type, aa integer, PRIMARY KEY(pk));";
@Before
public void before() throws SQLException {
@ -177,6 +179,33 @@ protected String server() {
return TestHelper.TEST_SERVER;
}
@Test
@FixFor("DBZ-6481")
public void insertsEnumPk() throws Exception {
Testing.Print.enable();
final var enumValues = List.of("UP", "DOWN", "LEFT", "RIGHT", "STORY");
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
for (int i = 0; i < enumValues.size(); i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
"s1.enumpk", connection.quotedColumnIdString(pkFieldName()), "'" + enumValues.get(i) + "'", i));
}
connection.commit();
}
startConnector();
sendAdHocSnapshotSignal("s1.enumpk");
// SNAPSHOT signal, OPEN WINDOW signal + data + CLOSE WINDOW signal
final var records = consumeRecordsByTopic(enumValues.size() + 3).allRecordsInOrder();
for (int i = 0; i < enumValues.size(); i++) {
var record = records.get(i + 2);
assertThat(((Struct) record.key()).getString("pk")).isEqualTo(enumValues.get(i));
assertThat(((Struct) record.value()).getStruct("after").getInt32("aa")).isEqualTo(i);
}
}
@Test
public void inserts4Pks() throws Exception {
// Testing.Print.enable();

View File

@ -1537,6 +1537,13 @@ public Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table
return rs.getObject(columnIndex);
}
/**
* Sets value on {@link PreparedStatement} and set explicit SQL type for it if necessary
*/
public void setQueryColumnValue(PreparedStatement statement, Column column, int pos, Object value) throws SQLException {
statement.setObject(pos, value);
}
/**
* Converts a {@link ResultSet} row to an array of Objects
*/

View File

@ -740,15 +740,16 @@ protected PreparedStatement readTableChunkStatement(String sql) throws SQLExcept
final Object[] chunkEndPosition = context.chunkEndPosititon();
// Fill boundaries placeholders
int pos = 0;
final List<Column> queryColumns = getQueryColumns(currentTable);
for (int i = 0; i < chunkEndPosition.length; i++) {
for (int j = 0; j < i + 1; j++) {
statement.setObject(++pos, chunkEndPosition[j]);
jdbcConnection.setQueryColumnValue(statement, queryColumns.get(j), ++pos, chunkEndPosition[j]);
}
}
// Fill maximum key placeholders
for (int i = 0; i < chunkEndPosition.length; i++) {
for (int j = 0; j < i + 1; j++) {
statement.setObject(++pos, maximumKey[j]);
jdbcConnection.setQueryColumnValue(statement, queryColumns.get(j), ++pos, maximumKey[j]);
}
}
}