diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index d2b6acae3..b0b78f209 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -36,6 +36,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; @@ -1405,4 +1406,24 @@ public static T querySingleValue(Connection connection, String queryString, throw new IllegalStateException("Exactly one result expected."); } } + + public String buildSelectWithRowLimits(TableId tableId, int limit, String projection, Optional condition, String orderBy) { + final StringBuilder sql = new StringBuilder("SELECT "); + sql + .append(projection) + .append(" FROM "); + // TODO Provide database based quoted format + sql.append(tableId.toString()); + if (condition.isPresent()) { + sql + .append(" WHERE ") + .append(condition.get()); + } + sql + .append(" ORDER BY ") + .append(orderBy) + .append(" LIMIT ") + .append(limit); + return sql.toString(); + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java index 94d4a4607..fa5de518d 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java @@ -12,6 +12,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Struct; @@ -80,12 +81,6 @@ public void closeWindow(String id, EventDispatcher dispatcher, OffsetContext return; } LOGGER.debug("Sending {} events from window buffer", window.size()); - // TODO There is an issue here - // Events are emitted with tx log coordinates of the CloseIncrementalSnapshotWindow - // These means that if the connector is restarted in the middle of emptying the buffer - // then the rest of the buffer might not be resent or even the snapshotting restarted - // as there is no more of events. - // Most probably could be solved by injecting a sequence of windowOpen/Closed upon the start offsetContext.incrementalSnapshotEvents(); for (Object[] row : window.values()) { sendEvent(dispatcher, offsetContext, row); @@ -147,13 +142,11 @@ private void emitWindowClose() throws SQLException { } protected String buildChunkQuery(Table table) { - final StringBuilder sql = new StringBuilder("SELECT * FROM "); - sql.append(table.id().toString()); - + String condition = null; // Add condition when this is not the first query if (context.isNonInitialChunk()) { + final StringBuilder sql = new StringBuilder(); // Window boundaries - sql.append(" WHERE "); addKeyColumnsToCondition(table, sql, " >= ?"); sql.append(" AND NOT ("); addKeyColumnsToCondition(table, sql, " = ?"); @@ -161,23 +154,23 @@ protected String buildChunkQuery(Table table) { // Table boundaries sql.append(" AND "); addKeyColumnsToCondition(table, sql, " <= ?"); + condition = sql.toString(); } - // TODO limiting is db dialect based - sql.append(" ORDER BY ") - .append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(", "))) - .append(" LIMIT ").append(connectorConfig.getIncrementalSnashotChunkSize()); - return sql.toString(); + final String orderBy = table.primaryKeyColumns().stream() + .map(Column::name) + .collect(Collectors.joining(", ")); + return jdbcConnection.buildSelectWithRowLimits(table.id(), + connectorConfig.getIncrementalSnashotChunkSize(), + "*", + Optional.ofNullable(condition), + orderBy); } protected String buildMaxPrimaryKeyQuery(Table table) { - final StringBuilder sql = new StringBuilder("SELECT * FROM "); - sql.append(table.id().toString()); - - // TODO limiting is db dialect based - sql.append(" ORDER BY ") - .append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(" DESC, "))) - .append(" DESC LIMIT ").append(1); - return sql.toString(); + final String orderBy = table.primaryKeyColumns().stream() + .map(Column::name) + .collect(Collectors.joining(" DESC, ")) + " DESC"; + return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, "*", Optional.empty(), orderBy.toString()); } @SuppressWarnings("unchecked") diff --git a/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSourceTest.java b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSourceTest.java index f4fe19e18..c9fe6683e 100644 --- a/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSourceTest.java +++ b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSourceTest.java @@ -11,6 +11,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; @@ -40,7 +41,8 @@ public String getConnectorName() { @Test public void testBuildQuery() { - final IncrementalSnapshotChangeEventSource source = new IncrementalSnapshotChangeEventSource<>(config(), null, null); + final IncrementalSnapshotChangeEventSource source = new IncrementalSnapshotChangeEventSource<>( + config(), new JdbcConnection(config().getConfig(), config -> null), null); final IncrementalSnapshotContext context = new IncrementalSnapshotContext<>(); source.setContext(context); final Column pk1 = Column.editor().name("pk1").create(); @@ -58,7 +60,8 @@ public void testBuildQuery() { @Test public void testMaxQuery() { - final IncrementalSnapshotChangeEventSource source = new IncrementalSnapshotChangeEventSource<>(config(), null, null); + final IncrementalSnapshotChangeEventSource source = new IncrementalSnapshotChangeEventSource<>( + config(), new JdbcConnection(config().getConfig(), config -> null), null); final Column pk1 = Column.editor().name("pk1").create(); final Column pk2 = Column.editor().name("pk2").create(); final Column val1 = Column.editor().name("val1").create();