DBZ-7024 Add more testing connectors

Add connector which runs mutiple tasks and connector whose some of
the tasks fail.
This commit is contained in:
Vojtech Juranek 2023-12-01 17:29:08 +01:00 committed by Jiri Pechanec
parent 7eaf0fc288
commit d7b7768071

View File

@ -11,10 +11,12 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@ -77,6 +79,68 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}
class NoOpConnector extends SimpleSourceConnector {
@Override
public Class<? extends Task> taskClass() {
return NoOpTask.class;
}
}
class NoOpTask extends SimpleSourceConnector.SimpleConnectorTask {
@Override
public List<SourceRecord> poll() throws InterruptedException {
return new ArrayList<SourceRecord>();
}
}
class MultiTaskSimpleSourceConnector extends SimpleSourceConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
config = props;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
configs.add(config);
}
return configs;
}
}
class RandomlyFailingDuringStartConnector extends MultiTaskSimpleSourceConnector {
@Override
public Class<? extends Task> taskClass() {
return RandomlyFailingDuringStartTask.class;
}
}
class RandomlyFailingDuringStartTask extends SimpleSourceConnector.SimpleConnectorTask {
Random rand = new Random();
@Override
public void start(Map<String, String> props) {
if (rand.nextBoolean()) {
try {
// Give other tasks chance to start
Thread.sleep(100);
}
catch (InterruptedException e) {
throw new IllegalStateException("Unexpected interrupted exception");
}
throw new IllegalStateException("Exception during start of the task");
}
}
}
class InterruptingOffsetStore implements OffsetBackingStore {
@Override