DBZ-7007 Use property based configuration in the emebdded tests

This commit is contained in:
Vojtech Juranek 2023-10-10 16:04:14 +02:00 committed by Jiri Pechanec
parent 9d26dd4cf4
commit 3295844cc3

View File

@ -191,18 +191,18 @@ public void verifyNonAsciiContentHandledCorrectly() throws Exception {
@Test
public void interruptedTaskShutsDown() throws Exception {
Configuration config = Configuration.create()
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, InterruptedConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngineConfig.OFFSET_STORAGE, InterruptingOffsetStore.class)
.build();
final Properties props = new Properties();
props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), InterruptedConnector.class.getName());
props.put(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS.name(), 0);
props.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), InterruptingOffsetStore.class.getName());
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0");
CountDownLatch firstLatch = new CountDownLatch(1);
engine = EmbeddedEngine.create()
.using(config)
.using(props)
.notifying((records, committer) -> {
})
.using(this.getClass().getClassLoader())
@ -227,18 +227,18 @@ public void interruptedTaskShutsDown() throws Exception {
@Test
public void interruptedOffsetCommitShutsDown() throws Exception {
Configuration config = Configuration.create()
.with(SimpleSourceConnector.BATCH_COUNT, 1)
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, SimpleSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngineConfig.OFFSET_STORAGE, InterruptingOffsetStore.class)
.build();
final Properties props = new Properties();
props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName());
props.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), InterruptingOffsetStore.class.getName());
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.put(SimpleSourceConnector.BATCH_COUNT, 1);
props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0");
CountDownLatch firstLatch = new CountDownLatch(1);
engine = EmbeddedEngine.create()
.using(config)
.using(props)
.using(OffsetCommitPolicy.always())
.notifying((records, committer) -> {
@ -306,19 +306,21 @@ public void shouldWorkToUseCustomChangeConsumer() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
Configuration config = Configuration.copy(connectorConfig)
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, FileStreamSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
final Properties props = new Properties();
props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), FileStreamSourceConnector.class.getName());
props.put(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS.name(), 0);
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.put(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString());
props.put(FileStreamSourceConnector.TOPIC_CONFIG, "topicX");
props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0");
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
// create an engine with our custom class
engine = EmbeddedEngine.create()
.using(config)
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
Integer groupCount = records.size() / NUMBER_OF_LINES;
@ -733,17 +735,17 @@ public void shouldRunDebeziumEngineWithMismatchedTypes() throws Exception {
@FixFor("DBZ-5583")
public void verifyBadCommitPolicyClassName() {
Configuration config = Configuration.create()
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, SimpleSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY, "badclassname") // force ClassNotFoundException
.build();
final Properties props = new Properties();
props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName());
props.put(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY.name(), "badclassname"); // force ClassNotFoundException
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
// props.put(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP, "0");
final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
engine = EmbeddedEngine.create()
.using(config)
.using(props)
.notifying((records, committer) -> {
})
.using(this.getClass().getClassLoader())