diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogBlockingSnapshotIT.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogBlockingSnapshotIT.java index 1bf60055f..902466593 100644 --- a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogBlockingSnapshotIT.java +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogBlockingSnapshotIT.java @@ -30,7 +30,7 @@ * @author Mario Fiore Vitale */ public abstract class BinlogBlockingSnapshotIT - extends AbstractBlockingSnapshotTest + extends AbstractBlockingSnapshotTest implements BinlogConnectorTest { protected static final String SERVER_NAME = "is_test"; diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java index 0737c65e7..e5e1dbbe8 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java @@ -23,7 +23,7 @@ import io.debezium.relational.history.SchemaHistory; import io.debezium.util.Testing; -public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { +public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { private OracleConnection connection; diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java index ef8f388cd..b5b6c98df 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java @@ -99,7 +99,7 @@ public void testEngineBasicLifecycle() throws Exception { CountDownLatch snapshotLatch = new CountDownLatch(1); CountDownLatch allLatch = new CountDownLatch(6); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -149,7 +149,7 @@ public void testRunMultipleTasks() throws Exception { props.put(SimpleSourceConnector.BATCH_COUNT, 1); final AtomicInteger recordsRead = new AtomicInteger(0); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .notifying((records, committer) -> { @@ -188,7 +188,7 @@ public void testTasksAreStoppedIfSomeFailsToStart() { props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10"); final AtomicInteger recordsRead = new AtomicInteger(0); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -239,7 +239,7 @@ public void testHeaderConverter() throws Exception { appendLinesToSource(1); CountDownLatch recordsLatch = new CountDownLatch(1); // 1 count down for headers - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder( + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>( KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class)); DebeziumEngine embeddedEngine = builder .using(props) @@ -281,7 +281,7 @@ public void testCompletionCallbackCalledUponSuccess() throws Exception { CountDownLatch callbackLatch = new CountDownLatch(1); AtomicInteger recordsSent = new AtomicInteger(); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -326,7 +326,7 @@ public void testCompletionCallbackCalledUponFailure() throws Exception { CountDownLatch callbackLatch = new CountDownLatch(1); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -363,7 +363,7 @@ public void testCannotStopWhileTasksAreStarting() throws Exception { CountDownLatch taskStartingLatch = new CountDownLatch(1); CountDownLatch enginStopLatch = new CountDownLatch(1); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -404,7 +404,7 @@ public void testCannotStopAlreadyStoppedEngine() throws Exception { props.put(SimpleSourceConnector.BATCH_COUNT, 1); props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10"); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -457,7 +457,7 @@ public void testExecuteSmt() throws Exception { // We have only 5 groups as the first one is filtered out (first records is filtered out and therefore group not counted) CountDownLatch allLatch = new CountDownLatch(5); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -514,7 +514,7 @@ public void testCloseSmt() throws Exception { CountDownLatch callbackLatch = new CountDownLatch(1); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -550,7 +550,7 @@ public void testPollingIsRetriedUponFailure() throws Exception { CountDownLatch recordsLatch = new CountDownLatch(SimpleSourceConnector.DEFAULT_BATCH_COUNT); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -586,7 +586,7 @@ public void testConnectorFailsIfMaxRetriesExceeded() throws Exception { CountDownLatch recordsLatch = new CountDownLatch(SimpleSourceConnector.DEFAULT_BATCH_COUNT); final LogInterceptor interceptor = new LogInterceptor(AsyncEmbeddedEngine.class); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) @@ -664,7 +664,7 @@ public void testCompletionCallbackCalledAfterConnectorStop() throws Exception { AtomicInteger recordsSent = new AtomicInteger(); AtomicBoolean connectorCallbackCalled = new AtomicBoolean(false); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using((success, message, error) -> { @@ -721,7 +721,7 @@ private void runEngineBasicLifecycleWithConsumer(final Properties props) throws appendLinesToSource(NUMBER_OF_LINES); CountDownLatch allLatch = new CountDownLatch(6 * NUMBER_OF_LINES); - DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(); + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder<>(); engine = builder .using(props) .using(new TestEngineConnectorCallback()) diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java index bbd1c64c4..d0d2f72cb 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java @@ -30,6 +30,7 @@ import javax.management.openmbean.TabularDataSupport; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceConnector; import org.awaitility.Awaitility; import org.junit.Test; @@ -42,7 +43,7 @@ import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest; -public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest { +public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest { private int signalingRecords; protected static final int ROW_COUNT = 1000;