From b00ae60373019c94719d6758e5b759659e13419f Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 29 Aug 2024 09:43:21 -0400 Subject: [PATCH] DBZ-8188 Use new connection on attach retries --- .../XstreamStreamingChangeEventSource.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java index 02e8b9cb4..bf18c4cf1 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java @@ -23,6 +23,7 @@ import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.SourceInfo; import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity; +import io.debezium.jdbc.JdbcConfiguration; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; @@ -107,23 +108,8 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, TableNameCaseSensitivity.INSENSITIVE.equals(connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection)), this, streamingMetrics); - try (OracleConnection xsConnection = new OracleConnection(jdbcConnection.config())) { + try (OracleConnection xsConnection = connectAndAttachWithRetries(jdbcConnection.config(), getStartPosition(offsetContext))) { try { - // 1. connect - final byte[] startPosition; - String lcrPosition = offsetContext.getLcrPosition(); - if (lcrPosition != null) { - startPosition = LcrPosition.valueOf(lcrPosition).getRawPosition(); - } - else { - startPosition = convertScnToPosition(offsetContext.getScn()); - } - - xsOut = performAttachWithRetries(xsConnection, startPosition); - if (xsOut == null) { - throw new DebeziumException("Failed to attach to the Oracle XStream outbound server"); - } - // 2. receive events while running while (context.isRunning()) { LOGGER.trace("Receiving LCR"); @@ -174,13 +160,25 @@ public OracleOffsetContext getOffsetContext() { return effectiveOffset; } - private XStreamOut performAttachWithRetries(OracleConnection xsConnection, byte[] startPosition) throws Exception { - XStreamOut out = null; + private byte[] getStartPosition(OracleOffsetContext offsetContext) { + final String lcrPosition = offsetContext.getLcrPosition(); + if (lcrPosition != null) { + return LcrPosition.valueOf(lcrPosition).getRawPosition(); + } + return convertScnToPosition(offsetContext.getScn()); + } + + private OracleConnection connectAndAttachWithRetries(JdbcConfiguration jdbcConfig, byte[] startPosition) throws Exception { + OracleConnection connection = null; for (int attempt = 1; attempt <= DEFAULT_MAX_ATTACH_RETRIES; attempt++) { + XStreamOut out = null; try { - out = XStreamOut.attach((oracle.jdbc.OracleConnection) xsConnection.connection(), xStreamServerName, + connection = new OracleConnection(jdbcConfig); + out = XStreamOut.attach((oracle.jdbc.OracleConnection) connection.connection(), xStreamServerName, startPosition, 1, 1, XStreamOut.DEFAULT_MODE); - break; + + xsOut = out; + return connection; } catch (StreamsException e) { if (!isAttachExceptionRetriable(e) || attempt == DEFAULT_MAX_ATTACH_RETRIES) { @@ -189,10 +187,16 @@ private XStreamOut performAttachWithRetries(OracleConnection xsConnection, byte[ } throw e; } - LOGGER.warn("Failed to attach to outbound server, retrying: {}", e.getMessage()); + } + finally { + // If we failed to attach and connection isn't null, close and clear it + if (xsOut == null && connection != null) { + connection.close(); + connection = null; + } } } - return out; + throw new DebeziumException("Failed to attach to the Oracle XStream outbound server"); } private boolean isAttachExceptionRetriable(StreamsException e) {