DBZ-3099 Snapshot buffering to handle config corner cases

This commit is contained in:
Jiri Pechanec 2021-02-15 10:17:19 +01:00 committed by Gunnar Morling
parent 7394ecfd62
commit 21b15facb2
6 changed files with 142 additions and 20 deletions

View File

@ -5,6 +5,12 @@
*/
package io.debezium.connector.mysql;
import java.util.function.Function;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
@ -24,10 +30,12 @@ public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory {
private final Clock clock;
private final MySqlTaskContext taskContext;
private final MySqlStreamingChangeEventSourceMetrics streamingMetrics;
private final ChangeEventQueue<DataChangeEvent> queue;
public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MySqlConnection connection,
ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock, MySqlDatabaseSchema schema,
MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics streamingMetrics) {
MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics streamingMetrics,
ChangeEventQueue<DataChangeEvent> queue) {
this.configuration = configuration;
this.connection = connection;
this.errorHandler = errorHandler;
@ -35,16 +43,23 @@ public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MySqlCo
this.clock = clock;
this.taskContext = taskContext;
this.streamingMetrics = streamingMetrics;
this.queue = queue;
}
@Override
public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offsetContext, SnapshotProgressListener snapshotProgressListener) {
return new MySqlSnapshotChangeEventSource(configuration, (MySqlOffsetContext) offsetContext, connection, taskContext.getSchema(), dispatcher, clock,
(MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener);
(MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, record -> modifyAndFlushLastRecord(record));
}
private void modifyAndFlushLastRecord(Function<SourceRecord, SourceRecord> modify) throws InterruptedException {
queue.flushBuffer(dataChange -> new DataChangeEvent(modify.apply(dataChange.getRecord())));
queue.disableBuffering();
}
@Override
public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext offsetContext) {
queue.disableBuffering();
return new MySqlStreamingChangeEventSource(
configuration,
(MySqlOffsetContext) offsetContext,

View File

@ -104,6 +104,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.buffering()
.build();
errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
@ -127,7 +128,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
errorHandler,
MySqlConnector.class,
connectorConfig,
new MySqlChangeEventSourceFactory(connectorConfig, connection, errorHandler, dispatcher, clock, schema, taskContext, streamingMetrics),
new MySqlChangeEventSourceFactory(connectorConfig, connection, errorHandler, dispatcher, clock, schema, taskContext, streamingMetrics, queue),
new MySqlChangeEventSourceMetricsFactory(streamingMetrics),
dispatcher,
schema);

View File

@ -24,13 +24,19 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
@ -58,10 +64,12 @@ public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEven
private final MySqlDatabaseSchema databaseSchema;
private final List<SchemaChangeEvent> schemaEvents = new ArrayList<>();
private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
private final BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor;
public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlOffsetContext previousOffset, MySqlConnection connection,
MySqlDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock,
MySqlSnapshotChangeEventSourceMetrics metrics) {
MySqlSnapshotChangeEventSourceMetrics metrics,
BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor) {
super(connectorConfig, previousOffset, connection, schema, dispatcher, clock, metrics);
this.connectorConfig = connectorConfig;
this.connection = connection;
@ -69,6 +77,7 @@ public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MySq
this.metrics = metrics;
this.previousOffset = previousOffset;
this.databaseSchema = schema;
this.lastEventProcessor = lastEventProcessor;
}
@Override
@ -673,4 +682,17 @@ protected void lastSnapshotRecord(RelationalSnapshotContext snapshotContext) {
}
}
@Override
protected void postSnapshot() throws InterruptedException {
// We cannot be sure that the last event as the last one
// - last table could be empty
// - data snapshot was not executed
// - the last table schema snaphsotted is not monitored and storing of monitored is disabled
lastEventProcessor.accept(record -> {
record.sourceOffset().remove(SourceInfo.SNAPSHOT_KEY);
((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE).put(SourceInfo.SNAPSHOT_KEY, SnapshotRecord.LAST.toString().toLowerCase());
return record;
});
super.postSnapshot();
}
}

View File

@ -35,6 +35,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.data.KeyValueStore;
import io.debezium.data.KeyValueStore.Collection;
import io.debezium.data.SchemaChangeHistory;
@ -96,20 +97,35 @@ protected Configuration.Builder simpleConfig() {
@Test
public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
snapshotOfSingleDatabase(true, false);
snapshotOfSingleDatabase(true, false, true);
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Exception {
snapshotOfSingleDatabase(false, false);
snapshotOfSingleDatabase(false, false, true);
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyMonitoredTables() throws Exception {
snapshotOfSingleDatabase(false, true);
snapshotOfSingleDatabase(false, true, true);
}
private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMonitoredTables) throws Exception {
@Test
public void shouldCreateSnapshotOfSingleDatabaseNoData() throws Exception {
snapshotOfSingleDatabase(true, false, false);
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockNoData() throws Exception {
snapshotOfSingleDatabase(false, false, false);
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyMonitoredTablesNoData() throws Exception {
snapshotOfSingleDatabase(false, true, false);
}
private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMonitoredTables, boolean data) throws Exception {
final LogInterceptor logInterceptor = new LogInterceptor();
final Builder builder = simpleConfig()
@ -122,6 +138,9 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
.with(MySqlConnectorConfig.TEST_DISABLE_GLOBAL_LOCKING, "true")
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, storeOnlyMonitoredTables);
}
if (!data) {
builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY);
}
config = builder.build();
// Start the connector ...
@ -133,7 +152,7 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(DATABASE.getServerName() + ".");
SchemaChangeHistory schemaChanges = new SchemaChangeHistory(DATABASE.getServerName());
final int schemaEventsCount = !useGlobalLock ? (storeOnlyMonitoredTables ? 8 : 14) : 0;
SourceRecords sourceRecords = consumeRecordsByTopic(schemaEventsCount + 9 + 4);
SourceRecords sourceRecords = consumeRecordsByTopic(schemaEventsCount + (data ? 9 + 4 : 0));
for (Iterator<SourceRecord> i = sourceRecords.allRecordsInOrder().iterator(); i.hasNext();) {
final SourceRecord record = i.next();
VerifyRecord.isValid(record);
@ -161,6 +180,10 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
assertThat(schemaChanges.recordCount()).isEqualTo(schemaEventsCount);
if (!data) {
return;
}
// Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(2);
Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName());
@ -601,15 +624,26 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
// 14 schema changes
SourceRecords sourceRecords = consumeRecordsByTopic(14 + 1);
final List<SourceRecord> allRecords = sourceRecords.allRecordsInOrder();
allRecords.forEach(record -> {
if (!record.topic().startsWith("__debezium-heartbeat")) {
assertThat(record.sourceOffset().get("snapshot")).isEqualTo(true);
VerifyRecord.isValid(record);
VerifyRecord.hasNoSourceQuery(record);
store.add(record);
schemaChanges.add(record);
for (Iterator<SourceRecord> i = allRecords.subList(0, allRecords.size() - 1).iterator(); i.hasNext();) {
final SourceRecord record = i.next();
VerifyRecord.isValid(record);
VerifyRecord.hasNoSourceQuery(record);
store.add(record);
schemaChanges.add(record);
if (record.topic().startsWith("__debezium-heartbeat")) {
continue;
}
});
final String snapshotSourceField = ((Struct) record.value()).getStruct("source").getString("snapshot");
if (i.hasNext()) {
final Object snapshotOffsetField = record.sourceOffset().get("snapshot");
assertThat(snapshotOffsetField).isEqualTo(true);
assertThat(snapshotSourceField).isEqualTo("true");
}
else {
assertThat(record.sourceOffset().get("snapshot")).isNull();
assertThat(snapshotSourceField).isEqualTo("last");
}
}
SourceRecord heartbeatRecord = allRecords.get(allRecords.size() - 1);
@ -622,7 +656,7 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
// Check that heartbeat has arrived
assertThat(heartbeatRecord.topic()).startsWith("__debezium-heartbeat");
assertThat(heartbeatRecord).isNotNull();
assertThat(heartbeatRecord.sourceOffset().get("snapshot")).isNotEqualTo(true);
assertThat(heartbeatRecord.sourceOffset().get("snapshot")).isNull();
}
private long toMicroSeconds(String duration) {

View File

@ -13,6 +13,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.connect.source.SourceRecord;
@ -67,11 +68,13 @@ public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
private final Supplier<PreviousContext> loggingContextSupplier;
private AtomicLong currentQueueSizeInBytes = new AtomicLong(0);
private Map<T, Long> objectMap = new ConcurrentHashMap<>();
private boolean buffering;
private T bufferedEvent;
private volatile RuntimeException producerException;
private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier,
long maxQueueSizeInBytes) {
long maxQueueSizeInBytes, boolean buffering) {
this.pollInterval = pollInterval;
this.maxBatchSize = maxBatchSize;
this.maxQueueSize = maxQueueSize;
@ -79,6 +82,7 @@ private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSi
this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
this.loggingContextSupplier = loggingContextSupplier;
this.maxQueueSizeInBytes = maxQueueSizeInBytes;
this.buffering = buffering;
}
public static class Builder<T> {
@ -88,6 +92,7 @@ public static class Builder<T> {
private int maxBatchSize;
private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;
private long maxQueueSizeInBytes;
private boolean buffering;
public Builder<T> pollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
@ -114,8 +119,13 @@ public Builder<T> maxQueueSizeInBytes(long maxQueueSizeInBytes) {
return this;
}
public Builder<T> buffering() {
this.buffering = true;
return this;
}
public ChangeEventQueue<T> build() {
return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier, maxQueueSizeInBytes);
return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier, maxQueueSizeInBytes, buffering);
}
}
@ -138,6 +148,42 @@ public void enqueue(T record) throws InterruptedException {
throw new InterruptedException();
}
if (buffering) {
final T newEvent = record;
record = bufferedEvent;
bufferedEvent = newEvent;
if (record == null) {
// Can happen only for the first coming event
return;
}
}
doEnqueue(record);
}
/**
* Applies a function to the event and the buffer and adds it to the queue. Buffer is emptied.
*
* @param recordModifier
* @throws InterruptedException
*/
public void flushBuffer(Function<T, T> recordModifier) throws InterruptedException {
assert buffering : "Unsuported for queues with disabled buffering";
if (bufferedEvent != null) {
doEnqueue(recordModifier.apply(bufferedEvent));
bufferedEvent = null;
}
}
/**
* Disable buffering for the queue
*/
public void disableBuffering() {
assert bufferedEvent == null : "Buffer must be flushed";
buffering = false;
}
protected void doEnqueue(T record) throws InterruptedException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}

View File

@ -144,6 +144,7 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, SnapshotContex
ctx.offset.postSnapshotCompletion();
}
postSnapshot();
dispatcher.alwaysDispatchHeartbeatEvent(ctx.offset);
return SnapshotResult.completed(ctx.offset);
}
@ -509,4 +510,7 @@ public RelationalSnapshotContext(String catalogName) throws SQLException {
protected Clock getClock() {
return clock;
}
protected void postSnapshot() throws InterruptedException {
}
}