diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index 0c1e82bd5..167bbd880 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -191,18 +191,18 @@ public void verifyNonAsciiContentHandledCorrectly() throws Exception { @Test public void interruptedTaskShutsDown() throws Exception { - Configuration config = Configuration.create() - .with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector") - .with(EmbeddedEngineConfig.CONNECTOR_CLASS, InterruptedConnector.class) - .with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH) - .with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0) - .with(EmbeddedEngineConfig.OFFSET_STORAGE, InterruptingOffsetStore.class) - .build(); + final Properties props = new Properties(); + props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector"); + props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), InterruptedConnector.class.getName()); + props.put(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS.name(), 0); + props.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), InterruptingOffsetStore.class.getName()); + props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0"); CountDownLatch firstLatch = new CountDownLatch(1); engine = EmbeddedEngine.create() - .using(config) + .using(props) .notifying((records, committer) -> { }) .using(this.getClass().getClassLoader()) @@ -227,18 +227,18 @@ public void interruptedTaskShutsDown() throws Exception { @Test public void interruptedOffsetCommitShutsDown() throws Exception { - Configuration config = Configuration.create() - .with(SimpleSourceConnector.BATCH_COUNT, 1) - .with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector") - .with(EmbeddedEngineConfig.CONNECTOR_CLASS, SimpleSourceConnector.class) - .with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH) - .with(EmbeddedEngineConfig.OFFSET_STORAGE, InterruptingOffsetStore.class) - .build(); + final Properties props = new Properties(); + props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector"); + props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName()); + props.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), InterruptingOffsetStore.class.getName()); + props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + props.put(SimpleSourceConnector.BATCH_COUNT, 1); + props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0"); CountDownLatch firstLatch = new CountDownLatch(1); engine = EmbeddedEngine.create() - .using(config) + .using(props) .using(OffsetCommitPolicy.always()) .notifying((records, committer) -> { @@ -306,19 +306,21 @@ public void shouldWorkToUseCustomChangeConsumer() throws Exception { // Add initial content to the file ... appendLinesToSource(NUMBER_OF_LINES); - Configuration config = Configuration.copy(connectorConfig) - .with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector") - .with(EmbeddedEngineConfig.CONNECTOR_CLASS, FileStreamSourceConnector.class) - .with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH) - .with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0) - .build(); + final Properties props = new Properties(); + props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector"); + props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), FileStreamSourceConnector.class.getName()); + props.put(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS.name(), 0); + props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + props.put(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString()); + props.put(FileStreamSourceConnector.TOPIC_CONFIG, "topicX"); + props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0"); CountDownLatch firstLatch = new CountDownLatch(1); CountDownLatch allLatch = new CountDownLatch(6); // create an engine with our custom class engine = EmbeddedEngine.create() - .using(config) + .using(props) .notifying((records, committer) -> { assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES); Integer groupCount = records.size() / NUMBER_OF_LINES; @@ -733,17 +735,17 @@ public void shouldRunDebeziumEngineWithMismatchedTypes() throws Exception { @FixFor("DBZ-5583") public void verifyBadCommitPolicyClassName() { - Configuration config = Configuration.create() - .with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector") - .with(EmbeddedEngineConfig.CONNECTOR_CLASS, SimpleSourceConnector.class) - .with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH) - .with(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY, "badclassname") // force ClassNotFoundException - .build(); + final Properties props = new Properties(); + props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector"); + props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName()); + props.put(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY.name(), "badclassname"); // force ClassNotFoundException + props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + // props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0"); final AtomicBoolean exceptionCaught = new AtomicBoolean(false); engine = EmbeddedEngine.create() - .using(config) + .using(props) .notifying((records, committer) -> { }) .using(this.getClass().getClassLoader())