DBZ-3473 Enable per-dialect query limit

This commit is contained in:
Jiri Pechanec 2021-05-03 13:50:16 +02:00
parent 260c32cf0e
commit 3371e6ccb3
3 changed files with 42 additions and 25 deletions

View File

@ -36,6 +36,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@ -1405,4 +1406,24 @@ public static <T> T querySingleValue(Connection connection, String queryString,
throw new IllegalStateException("Exactly one result expected.");
}
}
public String buildSelectWithRowLimits(TableId tableId, int limit, String projection, Optional<String> condition, String orderBy) {
final StringBuilder sql = new StringBuilder("SELECT ");
sql
.append(projection)
.append(" FROM ");
// TODO Provide database based quoted format
sql.append(tableId.toString());
if (condition.isPresent()) {
sql
.append(" WHERE ")
.append(condition.get());
}
sql
.append(" ORDER BY ")
.append(orderBy)
.append(" LIMIT ")
.append(limit);
return sql.toString();
}
}

View File

@ -12,6 +12,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
@ -80,12 +81,6 @@ public void closeWindow(String id, EventDispatcher<T> dispatcher, OffsetContext
return;
}
LOGGER.debug("Sending {} events from window buffer", window.size());
// TODO There is an issue here
// Events are emitted with tx log coordinates of the CloseIncrementalSnapshotWindow
// These means that if the connector is restarted in the middle of emptying the buffer
// then the rest of the buffer might not be resent or even the snapshotting restarted
// as there is no more of events.
// Most probably could be solved by injecting a sequence of windowOpen/Closed upon the start
offsetContext.incrementalSnapshotEvents();
for (Object[] row : window.values()) {
sendEvent(dispatcher, offsetContext, row);
@ -147,13 +142,11 @@ private void emitWindowClose() throws SQLException {
}
protected String buildChunkQuery(Table table) {
final StringBuilder sql = new StringBuilder("SELECT * FROM ");
sql.append(table.id().toString());
String condition = null;
// Add condition when this is not the first query
if (context.isNonInitialChunk()) {
final StringBuilder sql = new StringBuilder();
// Window boundaries
sql.append(" WHERE ");
addKeyColumnsToCondition(table, sql, " >= ?");
sql.append(" AND NOT (");
addKeyColumnsToCondition(table, sql, " = ?");
@ -161,23 +154,23 @@ protected String buildChunkQuery(Table table) {
// Table boundaries
sql.append(" AND ");
addKeyColumnsToCondition(table, sql, " <= ?");
condition = sql.toString();
}
// TODO limiting is db dialect based
sql.append(" ORDER BY ")
.append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(", ")))
.append(" LIMIT ").append(connectorConfig.getIncrementalSnashotChunkSize());
return sql.toString();
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
.collect(Collectors.joining(", "));
return jdbcConnection.buildSelectWithRowLimits(table.id(),
connectorConfig.getIncrementalSnashotChunkSize(),
"*",
Optional.ofNullable(condition),
orderBy);
}
protected String buildMaxPrimaryKeyQuery(Table table) {
final StringBuilder sql = new StringBuilder("SELECT * FROM ");
sql.append(table.id().toString());
// TODO limiting is db dialect based
sql.append(" ORDER BY ")
.append(table.primaryKeyColumns().stream().map(Column::name).collect(Collectors.joining(" DESC, ")))
.append(" DESC LIMIT ").append(1);
return sql.toString();
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
.collect(Collectors.joining(" DESC, ")) + " DESC";
return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, "*", Optional.empty(), orderBy.toString());
}
@SuppressWarnings("unchecked")

View File

@ -11,6 +11,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -40,7 +41,8 @@ public String getConnectorName() {
@Test
public void testBuildQuery() {
final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(config(), null, null);
final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null), null);
final IncrementalSnapshotContext<TableId> context = new IncrementalSnapshotContext<>();
source.setContext(context);
final Column pk1 = Column.editor().name("pk1").create();
@ -58,7 +60,8 @@ public void testBuildQuery() {
@Test
public void testMaxQuery() {
final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(config(), null, null);
final IncrementalSnapshotChangeEventSource<TableId> source = new IncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null), null);
final Column pk1 = Column.editor().name("pk1").create();
final Column pk2 = Column.editor().name("pk2").create();
final Column val1 = Column.editor().name("val1").create();