DBZ-5204 Do not cache connection details in TypeRegistry.

This commit is contained in:
Chris Cranford 2022-06-07 14:29:37 -04:00 committed by Jiri Pechanec
parent 4a48eb33df
commit 660e391851
2 changed files with 44 additions and 20 deletions

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.postgresql;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -101,8 +100,7 @@ private static Map<String, String> getLongTypeNames() {
private final Map<String, PostgresType> nameToType = new HashMap<>();
private final Map<Integer, PostgresType> oidToType = new HashMap<>();
private final Connection connection;
private final TypeInfo typeInfo;
private final PostgresConnection connection;
private final SqlTypeMapper sqlTypeMapper;
private int geometryOid = Integer.MIN_VALUE;
@ -119,9 +117,8 @@ private static Map<String, String> getLongTypeNames() {
public TypeRegistry(PostgresConnection connection) {
try {
this.connection = connection.connection();
typeInfo = ((BaseConnection) this.connection).getTypeInfo();
sqlTypeMapper = new SqlTypeMapper(this.connection, typeInfo);
this.connection = connection;
sqlTypeMapper = new SqlTypeMapper(this.connection);
prime();
}
@ -317,7 +314,7 @@ public static String normalizeTypeName(String typeName) {
* Prime the {@link TypeRegistry} with all existing database types
*/
private void prime() throws SQLException {
try (final Statement statement = connection.createStatement();
try (final Statement statement = connection.connection().createStatement();
final ResultSet rs = statement.executeQuery(SQL_TYPES)) {
final List<PostgresType.Builder> delayResolvedBuilders = new ArrayList<>();
while (rs.next()) {
@ -355,7 +352,7 @@ private PostgresType.Builder createTypeBuilderFromResultSet(ResultSet rs) throws
oid,
sqlTypeMapper.getSqlType(typeName),
modifiers,
typeInfo);
getTypeInfo(connection));
if (CATEGORY_ENUM.equals(category)) {
String[] enumValues = (String[]) rs.getArray("enum_values").getArray();
@ -371,7 +368,7 @@ private PostgresType resolveUnknownType(String name) {
try {
LOGGER.trace("Type '{}' not cached, attempting to lookup from database.", name);
try (final PreparedStatement statement = connection.prepareStatement(SQL_NAME_LOOKUP)) {
try (final PreparedStatement statement = connection.connection().prepareStatement(SQL_NAME_LOOKUP)) {
statement.setString(1, name);
return loadType(statement);
}
@ -385,7 +382,7 @@ private PostgresType resolveUnknownType(int lookupOid) {
try {
LOGGER.trace("Type OID '{}' not cached, attempting to lookup from database.", lookupOid);
try (final PreparedStatement statement = connection.prepareStatement(SQL_OID_LOOKUP)) {
try (final PreparedStatement statement = connection.connection().prepareStatement(SQL_OID_LOOKUP)) {
statement.setInt(1, lookupOid);
return loadType(statement);
}
@ -434,7 +431,7 @@ private static class SqlTypeMapper {
+ " ON sp.nspoid = typnamespace "
+ " ORDER BY typname, sp.r, pg_type.oid;";
private final TypeInfo typeInfo;
private final PostgresConnection connection;
@Immutable
private final Set<String> preloadedSqlTypes;
@ -442,10 +439,10 @@ private static class SqlTypeMapper {
@Immutable
private final Map<String, Integer> sqlTypesByPgTypeNames;
private SqlTypeMapper(Connection db, TypeInfo typeInfo) throws SQLException {
this.typeInfo = typeInfo;
this.preloadedSqlTypes = Collect.unmodifiableSet(typeInfo.getPGTypeNamesWithSQLTypes());
this.sqlTypesByPgTypeNames = Collections.unmodifiableMap(getSqlTypes(db, typeInfo));
private SqlTypeMapper(PostgresConnection connection) throws SQLException {
this.connection = connection;
this.preloadedSqlTypes = Collect.unmodifiableSet(getTypeInfo(connection).getPGTypeNamesWithSQLTypes());
this.sqlTypesByPgTypeNames = Collections.unmodifiableMap(getSqlTypes(connection));
}
public int getSqlType(String typeName) throws SQLException {
@ -454,7 +451,7 @@ public int getSqlType(String typeName) throws SQLException {
// obtain core types such as bool, int2 etc. from the driver, as it correctly maps these types to the JDBC
// type codes. Also those values are cached in TypeInfoCache.
if (isCoreType) {
return typeInfo.getSQLType(typeName);
return getTypeInfo(connection).getSQLType(typeName);
}
if (typeName.endsWith("[]")) {
return Types.ARRAY;
@ -467,11 +464,11 @@ public int getSqlType(String typeName) throws SQLException {
return pgType;
}
LOGGER.info("Failed to obtain SQL type information for type {} via custom statement, falling back to TypeInfo#getSQLType()", typeName);
return typeInfo.getSQLType(typeName);
return getTypeInfo(connection).getSQLType(typeName);
}
catch (Exception e) {
LOGGER.warn("Failed to obtain SQL type information for type {} via custom statement, falling back to TypeInfo#getSQLType()", typeName, e);
return typeInfo.getSQLType(typeName);
return getTypeInfo(connection).getSQLType(typeName);
}
}
}
@ -479,10 +476,10 @@ public int getSqlType(String typeName) throws SQLException {
/**
* Builds up a map of SQL (JDBC) types by PG type name; contains only values for non-core types.
*/
private static Map<String, Integer> getSqlTypes(Connection db, TypeInfo typeInfo) throws SQLException {
private static Map<String, Integer> getSqlTypes(PostgresConnection connection) throws SQLException {
Map<String, Integer> sqlTypesByPgTypeNames = new HashMap<>();
try (final Statement statement = db.createStatement()) {
try (final Statement statement = connection.connection().createStatement()) {
try (final ResultSet rs = statement.executeQuery(SQL_TYPE_DETAILS)) {
while (rs.next()) {
int type;
@ -512,4 +509,8 @@ else if ("e".equals(typtype)) {
return sqlTypesByPgTypeNames;
}
}
private static TypeInfo getTypeInfo(PostgresConnection connection) throws SQLException {
return ((BaseConnection) connection.connection()).getTypeInfo();
}
}

View File

@ -2789,6 +2789,29 @@ public void testStreamingWithNumericReplicationSlotName() throws Exception {
assertInsert(recordsForTopic.get(3), PK_FIELD, 203);
}
@Test
@FixFor("DBZ-5204")
public void testShouldNotCloseConnectionFetchingMetadataWithNewDataTypes() throws Exception {
TestHelper.execute(CREATE_TABLES_STMT);
Configuration config = TestHelper.defaultConfig().build();
start(PostgresConnector.class, config);
waitForStreamingRunning();
assertConnectorIsRunning();
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
TestHelper.execute("CREATE TYPE enum5204 as enum ('V1','V2')");
TestHelper.execute("CREATE TABLE s1.c (pk SERIAL, data enum5204, primary key (pk))");
TestHelper.execute("INSERT INTO s1.c (pk,data) values (1, 'V1'::enum5204)");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("s1.c"));
assertThat(recordsForTopic).hasSize(1);
assertInsert(recordsForTopic.get(0), PK_FIELD, 1);
System.out.println(recordsForTopic.get(0));
}
private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) {
return record -> {
Struct key = (Struct) record.key();