DBZ-8188 Use new connection on attach retries

This commit is contained in:
Chris Cranford 2024-08-29 09:43:21 -04:00 committed by Chris Cranford
parent 7423ee0294
commit b00ae60373

View File

@ -23,6 +23,7 @@
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
@ -107,23 +108,8 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
TableNameCaseSensitivity.INSENSITIVE.equals(connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection)),
this, streamingMetrics);
try (OracleConnection xsConnection = new OracleConnection(jdbcConnection.config())) {
try (OracleConnection xsConnection = connectAndAttachWithRetries(jdbcConnection.config(), getStartPosition(offsetContext))) {
try {
// 1. connect
final byte[] startPosition;
String lcrPosition = offsetContext.getLcrPosition();
if (lcrPosition != null) {
startPosition = LcrPosition.valueOf(lcrPosition).getRawPosition();
}
else {
startPosition = convertScnToPosition(offsetContext.getScn());
}
xsOut = performAttachWithRetries(xsConnection, startPosition);
if (xsOut == null) {
throw new DebeziumException("Failed to attach to the Oracle XStream outbound server");
}
// 2. receive events while running
while (context.isRunning()) {
LOGGER.trace("Receiving LCR");
@ -174,13 +160,25 @@ public OracleOffsetContext getOffsetContext() {
return effectiveOffset;
}
private XStreamOut performAttachWithRetries(OracleConnection xsConnection, byte[] startPosition) throws Exception {
XStreamOut out = null;
private byte[] getStartPosition(OracleOffsetContext offsetContext) {
final String lcrPosition = offsetContext.getLcrPosition();
if (lcrPosition != null) {
return LcrPosition.valueOf(lcrPosition).getRawPosition();
}
return convertScnToPosition(offsetContext.getScn());
}
private OracleConnection connectAndAttachWithRetries(JdbcConfiguration jdbcConfig, byte[] startPosition) throws Exception {
OracleConnection connection = null;
for (int attempt = 1; attempt <= DEFAULT_MAX_ATTACH_RETRIES; attempt++) {
XStreamOut out = null;
try {
out = XStreamOut.attach((oracle.jdbc.OracleConnection) xsConnection.connection(), xStreamServerName,
connection = new OracleConnection(jdbcConfig);
out = XStreamOut.attach((oracle.jdbc.OracleConnection) connection.connection(), xStreamServerName,
startPosition, 1, 1, XStreamOut.DEFAULT_MODE);
break;
xsOut = out;
return connection;
}
catch (StreamsException e) {
if (!isAttachExceptionRetriable(e) || attempt == DEFAULT_MAX_ATTACH_RETRIES) {
@ -189,10 +187,16 @@ private XStreamOut performAttachWithRetries(OracleConnection xsConnection, byte[
}
throw e;
}
LOGGER.warn("Failed to attach to outbound server, retrying: {}", e.getMessage());
}
finally {
// If we failed to attach and connection isn't null, close and clear it
if (xsOut == null && connection != null) {
connection.close();
connection = null;
}
}
return out;
}
throw new DebeziumException("Failed to attach to the Oracle XStream outbound server");
}
private boolean isAttachExceptionRetriable(StreamsException e) {