DBZ-396 Formatting and typo fix;
Also adding Peter to COPYRIGHT.txt
This commit is contained in:
parent
ba7478ffd7
commit
79a2e4a6c2
@ -22,6 +22,7 @@ Mario Mueller
|
||||
Matteo Capitanio
|
||||
Omar Al-Safi
|
||||
Liu Hanlin
|
||||
Peter Goransson
|
||||
Prannoy Mittal
|
||||
Ramesh Reddy
|
||||
Randall Hauch
|
||||
|
@ -15,20 +15,19 @@
|
||||
|
||||
/**
|
||||
* A component that blocks doing nothing until the connector task is stopped
|
||||
*
|
||||
* @author Peter Goransson
|
||||
*
|
||||
* @author Peter Goransson
|
||||
*/
|
||||
public class BlockingReader implements Reader {
|
||||
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
|
||||
private final AtomicReference<State> state = new AtomicReference<>();
|
||||
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
|
||||
private final String name;
|
||||
|
||||
|
||||
public BlockingReader(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
@ -37,30 +36,30 @@ public BlockingReader(String name) {
|
||||
* Waits indefinitely until the connector task is shut down
|
||||
*/
|
||||
@Override
|
||||
public List<SourceRecord> poll() throws InterruptedException {
|
||||
latch.await();
|
||||
public List<SourceRecord> poll() throws InterruptedException {
|
||||
latch.await();
|
||||
state.set(State.STOPPING);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public State state() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void uponCompletion(Runnable handler) {
|
||||
assert this.uponCompletion.get() == null;
|
||||
this.uponCompletion.set(handler);
|
||||
this.uponCompletion.set(handler);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
state.set(State.RUNNING);
|
||||
logger.info("Connector has completed all of its work but will continue in the running state. It can be shutdown at any time.");
|
||||
logger.info("Connector has completed all of its work but will continue in the running state. It can be shut down at any time.");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
try {
|
||||
@ -69,14 +68,14 @@ public void stop() {
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Cleanup Resources
|
||||
Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
|
||||
if (completionHandler != null) {
|
||||
completionHandler.run();
|
||||
}
|
||||
|
||||
|
||||
} finally {
|
||||
logger.info("Blocking Reader has completed.");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user