DBZ-778 Make the tests resilient against race condition
This commit is contained in:
parent
a7be3847e3
commit
0182eecd06
@ -18,11 +18,12 @@
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -77,15 +78,27 @@ public void shouldNotAllowMultipleReplicationSlotsOnTheSameDBSlotAndPlugin() thr
|
||||
|
||||
@Test
|
||||
public void shouldCloseConnectionOnInvalidSlotName() throws Exception {
|
||||
final int closeRetries = 60;
|
||||
final int waitPeriod = 2_000;
|
||||
try (ReplicationConnection conn1 = TestHelper.createForReplication("test1-", true)) {
|
||||
conn1.startStreaming();
|
||||
fail("Invalid slot name should fail");
|
||||
}
|
||||
catch (Exception e) {
|
||||
PostgresConnection connection = TestHelper.create();
|
||||
connection.execute(x -> {
|
||||
Assertions.assertThat(x.executeQuery("select * from pg_stat_replication where state = 'startup'").next()).as("Connection should not be active").isFalse();
|
||||
});
|
||||
final AtomicBoolean disconnected = new AtomicBoolean();
|
||||
for (int retry = 1; retry <= closeRetries; retry++) {
|
||||
PostgresConnection connection = TestHelper.create();
|
||||
connection.execute(x -> {
|
||||
disconnected.set(!x.executeQuery("select * from pg_stat_replication where state = 'startup'").next());
|
||||
});
|
||||
if (disconnected.get()) {
|
||||
break;
|
||||
}
|
||||
if (retry == closeRetries) {
|
||||
Assert.fail("Connection should not be active");
|
||||
}
|
||||
Thread.sleep(waitPeriod);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user