DBZ-8073: Added timeout for replication slot creation command

This commit is contained in:
Rajendra Dangwal 2024-08-07 13:04:41 +05:30 committed by Jiri Pechanec
parent dbb5a477d1
commit 733ef0d8b4
4 changed files with 163 additions and 1 deletions

View File

@ -594,6 +594,14 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
"Whether or not to seek to the last known offset on the replication slot." +
"Enabling this option results in startup failure if the slot is re-created instead of data loss.");
public static final Field CREATE_SLOT_COMMAND_TIMEOUT = Field.createInternal("create.slot.command.timeout")
.withDisplayName("Replication slot creation timeout")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 4))
.withDefault(90L)
.withImportance(Importance.LOW)
.withDescription("The timeout in seconds for the creation of the replication slot.");
public static final Field PUBLICATION_NAME = Field.create("publication.name")
.withDisplayName("Publication")
.withType(Type.STRING)
@ -1038,6 +1046,10 @@ public boolean slotSeekToKnownOffsetOnStart() {
return getConfig().getBoolean(SLOT_SEEK_TO_KNOWN_OFFSET);
}
public long createSlotCommandTimeout() {
return getConfig().getLong(CREATE_SLOT_COMMAND_TIMEOUT);
}
public String publicationName() {
return getConfig().getString(PUBLICATION_NAME);
}

View File

@ -521,13 +521,47 @@ public Optional<SlotCreationResult> createReplicationSlot() throws SQLException
initPublication();
try (Statement stmt = pgConnection().createStatement()) {
stmt.setQueryTimeout(toIntExact(connectorConfig.createSlotCommandTimeout()));
String createCommand = String.format(
"CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
slotName,
tempPart,
plugin.getPostgresPluginName());
LOGGER.info("Creating replication slot with command {}", createCommand);
stmt.execute(createCommand);
final int maxRetries = connectorConfig.maxRetries();
final Duration delay = connectorConfig.retryDelay();
int tryCount = 0;
while (true) {
try {
stmt.execute(createCommand);
break;
}
catch (SQLException ex) {
// intercept the statement timeout error and retry
if (ex.getMessage().contains("canceling statement due to user request")) {
String message = "Creation of replication slot failed; " +
"query to create replication slot timed out, please make sure that there are no long running queries on the database.";
if (++tryCount > maxRetries) {
throw new DebeziumException(message, ex);
}
else {
LOGGER.warn("{} Waiting for {} and retrying, attempt number {} over {}", message, delay, tryCount, maxRetries, ex);
final Metronome metronome = Metronome.parker(delay, Clock.SYSTEM);
try {
metronome.pause();
}
catch (InterruptedException e) {
LOGGER.warn("Slot creation retry sleep interrupted by exception: {}", e.getMessage());
Thread.currentThread().interrupt();
}
}
}
else {
throw ex;
}
}
}
// when we are in Postgres 9.4+, we can parse the slot creation info,
// otherwise, it returns nothing
if (canExportSnapshot) {

View File

@ -208,6 +208,39 @@ public static void execute(String statement, String... furtherStatements) {
}
}
/**
* Executes a JDBC statement using the default jdbc config without committing the connection
*
* @param statement A SQL statement
* @param furtherStatements Further SQL statement(s)
*
* @return the PostgresConnection instance; never null
*/
public static PostgresConnection executeWithoutCommit(String statement, String... furtherStatements) {
if (furtherStatements != null) {
for (String further : furtherStatements) {
statement = statement + further;
}
}
try {
PostgresConnection connection = create();
connection.setAutoCommit(false);
connection.executeWithoutCommitting(statement);
Connection jdbcConn = connection.connection();
if (statement.endsWith("ROLLBACK;")) {
jdbcConn.rollback();
}
return connection;
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Drops all the public non system schemas from the DB.
*

View File

@ -122,6 +122,89 @@ public void shouldNotAllowRetryWhenConfigured() throws Exception {
}
}
@Test(expected = SQLException.class)
public void shouldNotRetryIfSlotCreationFailsWithoutTimeoutError() throws Exception {
LogInterceptor interceptor = new LogInterceptor(PostgresReplicationConnection.class);
try (ReplicationConnection conn1 = TestHelper.createForReplication("test1", false)) {
conn1.createReplicationSlot();
// try to create the replication slot with same name again
try (ReplicationConnection conn2 = TestHelper.createForReplication("test1", false)) {
conn2.createReplicationSlot();
fail("Should not be able to create 2 replication slots on same db and plugin");
}
catch (Exception e) {
assertFalse(interceptor.containsWarnMessage("and retrying, attempt number"));
assertTrue(e.getMessage().contains("ERROR: replication slot \"test1\" already exists"));
throw e;
}
}
}
@Test(expected = DebeziumException.class)
public void shouldRetryAndFailIfSlotCreationFailsWithTimeoutErrorOnLimitedRetries() throws Exception {
LogInterceptor interceptor = new LogInterceptor(PostgresReplicationConnection.class);
// open a transaction and don't commit it, so the slot creation will fail with timeout error
String statement = "DROP TABLE IF EXISTS table_with_pk;" +
"CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));" +
"INSERT INTO table_with_pk (b, c) VALUES('val1', now()); ";
PostgresConnection connection = TestHelper.executeWithoutCommit(statement);
try (ReplicationConnection conn1 = TestHelper.createForReplication("test1", false,
new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.MAX_RETRIES, 1)
.with(PostgresConnectorConfig.RETRY_DELAY_MS, 10)
.with(PostgresConnectorConfig.CREATE_SLOT_COMMAND_TIMEOUT, 2)
.build()))) {
conn1.createReplicationSlot();
}
catch (Exception e) {
assertTrue(interceptor.containsWarnMessage("and retrying, attempt number"));
assertTrue(e.getCause().getMessage().contains("ERROR: canceling statement due to user request"));
assertTrue(e.getMessage().contains("query to create replication slot timed out"));
throw e;
}
finally {
connection.commit();
}
}
@Test
public void shouldSucceedIfSlotCreationSucceedsAfterTimeoutErrors() throws Exception {
LogInterceptor interceptor = new LogInterceptor(PostgresReplicationConnection.class);
// open a transaction and don't commit it, so the slot creation will fail with timeout
String statement = "DROP TABLE IF EXISTS table_with_pk;" +
"CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));" +
"INSERT INTO table_with_pk (b, c) VALUES('val1', now()); ";
PostgresConnection connection = TestHelper.executeWithoutCommit(statement);
try (ReplicationConnection conn1 = TestHelper.createForReplication("test1", false,
new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.MAX_RETRIES, 1)
.with(PostgresConnectorConfig.RETRY_DELAY_MS, 10)
.with(PostgresConnectorConfig.CREATE_SLOT_COMMAND_TIMEOUT, 2)
.build()))) {
conn1.createReplicationSlot();
}
catch (Exception e) {
assertTrue(interceptor.containsWarnMessage("and retrying, attempt number"));
assertTrue(e.getCause().getMessage().contains("ERROR: canceling statement due to user request"));
assertTrue(e.getMessage().contains("query to create replication slot timed out"));
}
finally {
connection.commit();
}
// slot creation should be successful as there are no open transactions now
try (ReplicationConnection conn2 = TestHelper.createForReplication("test1", false,
new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.MAX_RETRIES, 1)
.with(PostgresConnectorConfig.RETRY_DELAY_MS, 10)
.with(PostgresConnectorConfig.CREATE_SLOT_COMMAND_TIMEOUT, 2)
.build()))) {
conn2.createReplicationSlot();
}
catch (Exception e) {
fail("Should be able to create replication slot after no active transactions are present.");
}
}
@Test
public void shouldCloseConnectionOnInvalidSlotName() throws Exception {
final int closeRetries = 60;