From 733ef0d8b46f6b6393f88762c39b651c5f9a527a Mon Sep 17 00:00:00 2001 From: Rajendra Dangwal <31967338+rajdangwal@users.noreply.github.com> Date: Wed, 7 Aug 2024 13:04:41 +0530 Subject: [PATCH] DBZ-8073: Added timeout for replication slot creation command --- .../postgresql/PostgresConnectorConfig.java | 12 +++ .../PostgresReplicationConnection.java | 36 +++++++- .../connector/postgresql/TestHelper.java | 33 ++++++++ .../connection/ReplicationConnectionIT.java | 83 +++++++++++++++++++ 4 files changed, 163 insertions(+), 1 deletion(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 9e1c1e972..8f9b654f4 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -594,6 +594,14 @@ public static SnapshotLockingMode parse(String value, String defaultValue) { "Whether or not to seek to the last known offset on the replication slot." + "Enabling this option results in startup failure if the slot is re-created instead of data loss."); + public static final Field CREATE_SLOT_COMMAND_TIMEOUT = Field.createInternal("create.slot.command.timeout") + .withDisplayName("Replication slot creation timeout") + .withType(Type.LONG) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 4)) + .withDefault(90L) + .withImportance(Importance.LOW) + .withDescription("The timeout in seconds for the creation of the replication slot."); + public static final Field PUBLICATION_NAME = Field.create("publication.name") .withDisplayName("Publication") .withType(Type.STRING) @@ -1038,6 +1046,10 @@ public boolean slotSeekToKnownOffsetOnStart() { return getConfig().getBoolean(SLOT_SEEK_TO_KNOWN_OFFSET); } + public long createSlotCommandTimeout() { + return getConfig().getLong(CREATE_SLOT_COMMAND_TIMEOUT); + } + public String publicationName() { return getConfig().getString(PUBLICATION_NAME); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 6e8cf63f0..3c4a0c769 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -521,13 +521,47 @@ public Optional createReplicationSlot() throws SQLException initPublication(); try (Statement stmt = pgConnection().createStatement()) { + stmt.setQueryTimeout(toIntExact(connectorConfig.createSlotCommandTimeout())); String createCommand = String.format( "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s", slotName, tempPart, plugin.getPostgresPluginName()); LOGGER.info("Creating replication slot with command {}", createCommand); - stmt.execute(createCommand); + + final int maxRetries = connectorConfig.maxRetries(); + final Duration delay = connectorConfig.retryDelay(); + int tryCount = 0; + while (true) { + try { + stmt.execute(createCommand); + break; + } + catch (SQLException ex) { + // intercept the statement timeout error and retry + if (ex.getMessage().contains("canceling statement due to user request")) { + String message = "Creation of replication slot failed; " + + "query to create replication slot timed out, please make sure that there are no long running queries on the database."; + if (++tryCount > maxRetries) { + throw new DebeziumException(message, ex); + } + else { + LOGGER.warn("{} Waiting for {} and retrying, attempt number {} over {}", message, delay, tryCount, maxRetries, ex); + final Metronome metronome = Metronome.parker(delay, Clock.SYSTEM); + try { + metronome.pause(); + } + catch (InterruptedException e) { + LOGGER.warn("Slot creation retry sleep interrupted by exception: {}", e.getMessage()); + Thread.currentThread().interrupt(); + } + } + } + else { + throw ex; + } + } + } // when we are in Postgres 9.4+, we can parse the slot creation info, // otherwise, it returns nothing if (canExportSnapshot) { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index 546b18702..783374d25 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -208,6 +208,39 @@ public static void execute(String statement, String... furtherStatements) { } } + /** + * Executes a JDBC statement using the default jdbc config without committing the connection + * + * @param statement A SQL statement + * @param furtherStatements Further SQL statement(s) + * + * @return the PostgresConnection instance; never null + */ + public static PostgresConnection executeWithoutCommit(String statement, String... furtherStatements) { + if (furtherStatements != null) { + for (String further : furtherStatements) { + statement = statement + further; + } + } + + try { + PostgresConnection connection = create(); + connection.setAutoCommit(false); + connection.executeWithoutCommitting(statement); + Connection jdbcConn = connection.connection(); + if (statement.endsWith("ROLLBACK;")) { + jdbcConn.rollback(); + } + return connection; + } + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * Drops all the public non system schemas from the DB. * diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java index bb796ca01..fa1e6f684 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java @@ -122,6 +122,89 @@ public void shouldNotAllowRetryWhenConfigured() throws Exception { } } + @Test(expected = SQLException.class) + public void shouldNotRetryIfSlotCreationFailsWithoutTimeoutError() throws Exception { + LogInterceptor interceptor = new LogInterceptor(PostgresReplicationConnection.class); + try (ReplicationConnection conn1 = TestHelper.createForReplication("test1", false)) { + conn1.createReplicationSlot(); + // try to create the replication slot with same name again + try (ReplicationConnection conn2 = TestHelper.createForReplication("test1", false)) { + conn2.createReplicationSlot(); + fail("Should not be able to create 2 replication slots on same db and plugin"); + } + catch (Exception e) { + assertFalse(interceptor.containsWarnMessage("and retrying, attempt number")); + assertTrue(e.getMessage().contains("ERROR: replication slot \"test1\" already exists")); + throw e; + } + } + } + + @Test(expected = DebeziumException.class) + public void shouldRetryAndFailIfSlotCreationFailsWithTimeoutErrorOnLimitedRetries() throws Exception { + LogInterceptor interceptor = new LogInterceptor(PostgresReplicationConnection.class); + // open a transaction and don't commit it, so the slot creation will fail with timeout error + String statement = "DROP TABLE IF EXISTS table_with_pk;" + + "CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));" + + "INSERT INTO table_with_pk (b, c) VALUES('val1', now()); "; + PostgresConnection connection = TestHelper.executeWithoutCommit(statement); + try (ReplicationConnection conn1 = TestHelper.createForReplication("test1", false, + new PostgresConnectorConfig(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.MAX_RETRIES, 1) + .with(PostgresConnectorConfig.RETRY_DELAY_MS, 10) + .with(PostgresConnectorConfig.CREATE_SLOT_COMMAND_TIMEOUT, 2) + .build()))) { + conn1.createReplicationSlot(); + } + catch (Exception e) { + assertTrue(interceptor.containsWarnMessage("and retrying, attempt number")); + assertTrue(e.getCause().getMessage().contains("ERROR: canceling statement due to user request")); + assertTrue(e.getMessage().contains("query to create replication slot timed out")); + throw e; + } + finally { + connection.commit(); + } + } + + @Test + public void shouldSucceedIfSlotCreationSucceedsAfterTimeoutErrors() throws Exception { + LogInterceptor interceptor = new LogInterceptor(PostgresReplicationConnection.class); + // open a transaction and don't commit it, so the slot creation will fail with timeout + String statement = "DROP TABLE IF EXISTS table_with_pk;" + + "CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));" + + "INSERT INTO table_with_pk (b, c) VALUES('val1', now()); "; + PostgresConnection connection = TestHelper.executeWithoutCommit(statement); + try (ReplicationConnection conn1 = TestHelper.createForReplication("test1", false, + new PostgresConnectorConfig(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.MAX_RETRIES, 1) + .with(PostgresConnectorConfig.RETRY_DELAY_MS, 10) + .with(PostgresConnectorConfig.CREATE_SLOT_COMMAND_TIMEOUT, 2) + .build()))) { + conn1.createReplicationSlot(); + } + catch (Exception e) { + assertTrue(interceptor.containsWarnMessage("and retrying, attempt number")); + assertTrue(e.getCause().getMessage().contains("ERROR: canceling statement due to user request")); + assertTrue(e.getMessage().contains("query to create replication slot timed out")); + } + finally { + connection.commit(); + } + // slot creation should be successful as there are no open transactions now + try (ReplicationConnection conn2 = TestHelper.createForReplication("test1", false, + new PostgresConnectorConfig(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.MAX_RETRIES, 1) + .with(PostgresConnectorConfig.RETRY_DELAY_MS, 10) + .with(PostgresConnectorConfig.CREATE_SLOT_COMMAND_TIMEOUT, 2) + .build()))) { + conn2.createReplicationSlot(); + } + catch (Exception e) { + fail("Should be able to create replication slot after no active transactions are present."); + } + } + @Test public void shouldCloseConnectionOnInvalidSlotName() throws Exception { final int closeRetries = 60;