DBZ-8135 Basic Signaler tests for PG

This commit is contained in:
Jakub Cechacek 2024-08-12 16:10:01 +02:00 committed by Jiri Pechanec
parent dd5e6e1a16
commit 9965835e15
2 changed files with 32 additions and 7 deletions

View File

@ -32,6 +32,7 @@
import io.debezium.connector.postgresql.spi.CustomActionProvider;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.actions.Log;
public class SignalsIT extends AbstractAsyncEngineConnectorTest {
@ -62,12 +63,21 @@ public void signalLog() throws InterruptedException {
signalLog(false);
}
@Test
public void signalLogViaInProcessChannel() throws InterruptedException {
signalLog(false, false);
}
@Test
public void signalLogWithEscapedCharacter() throws InterruptedException {
signalLog(true);
}
private void signalLog(boolean includingEscapedCharacter) throws InterruptedException {
signalLog(includingEscapedCharacter, true);
}
private void signalLog(boolean includingEscapedCharacter, boolean useSource) throws InterruptedException {
// Testing.Print.enable();
final LogInterceptor logInterceptor = new LogInterceptor(Log.class);
@ -81,6 +91,7 @@ private void signalLog(boolean includingEscapedCharacter) throws InterruptedExce
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, includingEscapedCharacter ? signalTableWithEscapedCharacter : signalTable)
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500")
.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,in-process")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
@ -93,15 +104,23 @@ private void signalLog(boolean includingEscapedCharacter) throws InterruptedExce
// insert and verify a new record
TestHelper.execute(INSERT_STMT);
// Insert the signal record
String insertSql = String.format("INSERT INTO %s VALUES('1', 'log', '{\"message\": \"Signal message at offset ''{}''\"}')",
includingEscapedCharacter ? signalTableWithEscapedCharacter : signalTable);
TestHelper.execute(insertSql);
int expectedNumRecords;
if (useSource) {
expectedNumRecords = 2;
// Insert the signal record
String insertSql = String.format("INSERT INTO %s VALUES('1', 'log', '{\"message\": \"Signal message at offset ''{}''\"}')",
includingEscapedCharacter ? signalTableWithEscapedCharacter : signalTable);
TestHelper.execute(insertSql);
}
else {
expectedNumRecords = 1;
signaler.signal(new SignalRecord("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", null));
}
waitForAvailableRecords(800, TimeUnit.MILLISECONDS);
final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.allRecordsInOrder()).hasSize(2);
final SourceRecords records = consumeRecordsByTopic(expectedNumRecords);
assertThat(records.allRecordsInOrder()).hasSize(expectedNumRecords);
assertThat(logInterceptor.containsMessage("Signal message at offset")).isTrue();
}

View File

@ -8,6 +8,7 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.TestingDebeziumEngine;
import io.debezium.engine.DebeziumEngine;
import io.debezium.pipeline.signal.SignalRecord;
/**
* Base class for testing connectors using {@link AsyncEmbeddedEngine}.
@ -16,9 +17,14 @@
*/
public class AbstractAsyncEngineConnectorTest extends AbstractConnectorTest {
protected DebeziumEngine.Signaler<SignalRecord> signaler;
@Override
protected DebeziumEngine.Builder createEngineBuilder() {
return new AsyncEmbeddedEngine.AsyncEngineBuilder();
this.signaler = new AsyncEngineSignaler();
return new AsyncEmbeddedEngine.AsyncEngineBuilder()
.using(signaler);
}
@Override