DBZ-8166 Avoid unchecked conversion warnings in the tests

This commit is contained in:
Vojtech Juranek 2024-08-27 13:45:46 +02:00 committed by Vojtěch Juránek
parent 17a3908403
commit c4fc9123b2
4 changed files with 18 additions and 17 deletions

View File

@ -30,7 +30,7 @@
* @author Mario Fiore Vitale
*/
public abstract class BinlogBlockingSnapshotIT<C extends SourceConnector>
extends AbstractBlockingSnapshotTest
extends AbstractBlockingSnapshotTest<C>
implements BinlogConnectorTest<C> {
protected static final String SERVER_NAME = "is_test";

View File

@ -23,7 +23,7 @@
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest<OracleConnector> {
private OracleConnection connection;

View File

@ -99,7 +99,7 @@ public void testEngineBasicLifecycle() throws Exception {
CountDownLatch snapshotLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -149,7 +149,7 @@ public void testRunMultipleTasks() throws Exception {
props.put(SimpleSourceConnector.BATCH_COUNT, 1);
final AtomicInteger recordsRead = new AtomicInteger(0);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.notifying((records, committer) -> {
@ -188,7 +188,7 @@ public void testTasksAreStoppedIfSomeFailsToStart() {
props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
final AtomicInteger recordsRead = new AtomicInteger(0);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -239,7 +239,7 @@ public void testHeaderConverter() throws Exception {
appendLinesToSource(1);
CountDownLatch recordsLatch = new CountDownLatch(1); // 1 count down for headers
DebeziumEngine.Builder<EmbeddedEngineChangeEvent> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(
DebeziumEngine.Builder<EmbeddedEngineChangeEvent> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(
KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class));
DebeziumEngine<EmbeddedEngineChangeEvent> embeddedEngine = builder
.using(props)
@ -281,7 +281,7 @@ public void testCompletionCallbackCalledUponSuccess() throws Exception {
CountDownLatch callbackLatch = new CountDownLatch(1);
AtomicInteger recordsSent = new AtomicInteger();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -326,7 +326,7 @@ public void testCompletionCallbackCalledUponFailure() throws Exception {
CountDownLatch callbackLatch = new CountDownLatch(1);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -363,7 +363,7 @@ public void testCannotStopWhileTasksAreStarting() throws Exception {
CountDownLatch taskStartingLatch = new CountDownLatch(1);
CountDownLatch enginStopLatch = new CountDownLatch(1);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -404,7 +404,7 @@ public void testCannotStopAlreadyStoppedEngine() throws Exception {
props.put(SimpleSourceConnector.BATCH_COUNT, 1);
props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -457,7 +457,7 @@ public void testExecuteSmt() throws Exception {
// We have only 5 groups as the first one is filtered out (first records is filtered out and therefore group not counted)
CountDownLatch allLatch = new CountDownLatch(5);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -514,7 +514,7 @@ public void testCloseSmt() throws Exception {
CountDownLatch callbackLatch = new CountDownLatch(1);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -550,7 +550,7 @@ public void testPollingIsRetriedUponFailure() throws Exception {
CountDownLatch recordsLatch = new CountDownLatch(SimpleSourceConnector.DEFAULT_BATCH_COUNT);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -586,7 +586,7 @@ public void testConnectorFailsIfMaxRetriesExceeded() throws Exception {
CountDownLatch recordsLatch = new CountDownLatch(SimpleSourceConnector.DEFAULT_BATCH_COUNT);
final LogInterceptor interceptor = new LogInterceptor(AsyncEmbeddedEngine.class);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())
@ -664,7 +664,7 @@ public void testCompletionCallbackCalledAfterConnectorStop() throws Exception {
AtomicInteger recordsSent = new AtomicInteger();
AtomicBoolean connectorCallbackCalled = new AtomicBoolean(false);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using((success, message, error) -> {
@ -721,7 +721,7 @@ private void runEngineBasicLifecycleWithConsumer(final Properties props) throws
appendLinesToSource(NUMBER_OF_LINES);
CountDownLatch allLatch = new CountDownLatch(6 * NUMBER_OF_LINES);
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
DebeziumEngine.Builder<SourceRecord> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>();
engine = builder
.using(props)
.using(new TestEngineConnectorCallback())

View File

@ -30,6 +30,7 @@
import javax.management.openmbean.TabularDataSupport;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.awaitility.Awaitility;
import org.junit.Test;
@ -42,7 +43,7 @@
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest {
public abstract class AbstractBlockingSnapshotTest<T extends SourceConnector> extends AbstractSnapshotTest<T> {
private int signalingRecords;
protected static final int ROW_COUNT = 1000;