DBZ-7007 Switch to DebeziuEngine in EmbeddedEngineTest

This commit is contained in:
Vojtech Juranek 2023-10-11 22:27:59 +02:00 committed by Jiri Pechanec
parent 3295844cc3
commit d0c2658c43

View File

@ -201,7 +201,7 @@ public void interruptedTaskShutsDown() throws Exception {
CountDownLatch firstLatch = new CountDownLatch(1);
engine = EmbeddedEngine.create()
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying((records, committer) -> {
})
@ -237,12 +237,12 @@ public void interruptedOffsetCommitShutsDown() throws Exception {
CountDownLatch firstLatch = new CountDownLatch(1);
engine = EmbeddedEngine.create()
final DebeziumEngine engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(props)
.using(OffsetCommitPolicy.always())
.notifying((records, committer) -> {
for (SourceRecord record : records) {
for (RecordChangeEvent<SourceRecord> record : records) {
committer.markProcessed(record);
}
committer.markBatchFinished();
@ -319,13 +319,13 @@ public void shouldWorkToUseCustomChangeConsumer() throws Exception {
CountDownLatch allLatch = new CountDownLatch(6);
// create an engine with our custom class
engine = EmbeddedEngine.create()
final DebeziumEngine engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
Integer groupCount = records.size() / NUMBER_OF_LINES;
for (SourceRecord r : records) {
for (RecordChangeEvent<SourceRecord> r : records) {
committer.markProcessed(r);
}
@ -744,7 +744,7 @@ public void verifyBadCommitPolicyClassName() {
final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
engine = EmbeddedEngine.create()
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying((records, committer) -> {
})