DBZ-2461 LSN must not be flushed after connection close

This commit is contained in:
Jiri Pechanec 2020-09-16 11:40:52 +02:00 committed by GitHub
parent 6ca025b1bb
commit eed321d485
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 0 deletions

View File

@ -168,7 +168,9 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
replicationConnection.close();
}
catch (Exception e) {
LOGGER.debug("Exception while closing the connection", e);
}
replicationStream.set(null);
}
}
}

View File

@ -9,20 +9,30 @@
import static org.fest.assertions.Assertions.assertThat;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.fest.assertions.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.doc.FixFor;
import io.debezium.document.Document;
@ -47,6 +57,8 @@
*/
public class DebeziumEngineIT {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumEngineIT.class);
protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("connector-offsets.txt").toAbsolutePath();
@Rule
@ -197,4 +209,88 @@ public void shouldSerializeToCloudEvents() throws Exception {
}
}
private static final AtomicInteger offsetStoreSetCalls = new AtomicInteger();
public static class TestOffsetStore extends FileOffsetBackingStore {
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
LOGGER.info("Get offsets called");
return super.get(keys);
}
@Override
public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback) {
LOGGER.info("Set offsets called");
offsetStoreSetCalls.incrementAndGet();
return super.set(values, callback);
}
}
@Test
@FixFor("DBZ-2461")
public void testOffsetsCommitAfterStop() throws Exception {
final AtomicReference<Throwable> exception = new AtomicReference<>();
DebeziumEngine<ChangeEvent<String, String>> engine;
TestHelper.execute("DROP TABLE IF EXISTS tests;", "CREATE TABLE tests (id SERIAL PRIMARY KEY);");
final Properties props = new Properties();
props.putAll(TestHelper.defaultConfig().build().asMap());
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "3000");
props.setProperty("converter.schemas.enable", "false");
props.setProperty("offset.storage",
TestOffsetStore.class.getName());
engine = DebeziumEngine.create(Json.class).using(props).using(new DebeziumEngine.ConnectorCallback() {
@Override
public void connectorStarted() {
}
@Override
public void connectorStopped() {
}
}).using((success, message, error) -> {
exception.compareAndSet(null, error);
}).notifying((records, committer) -> {
try {
for (ChangeEvent<String, String> record : records) {
committer.markProcessed(record);
}
committer.markBatchFinished();
}
catch (Exception e) {
Testing.printError(e);
}
}).build();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
while (offsetStoreSetCalls.get() < 1) {
TestHelper.execute("INSERT INTO tests VALUES(default)");
}
engine.close();
Assertions.assertThat(offsetStoreSetCalls.get()).isGreaterThanOrEqualTo(1);
offsetStoreSetCalls.set(0);
for (int i = 0; i < 100; i++) {
TestHelper.execute("INSERT INTO tests VALUES(default)");
}
executor.execute(engine);
while (offsetStoreSetCalls.get() < 1) {
TestHelper.execute("INSERT INTO tests VALUES(default)");
}
engine.close();
Assertions.assertThat(offsetStoreSetCalls.get()).isGreaterThanOrEqualTo(1);
Assertions.assertThat(exception.get()).isNull();
}
}