DBZ-2149 Starting/stopping Kafka just once in KafkaDatabaseHistoryTest;
By using distinct topic names in the tests, the same Kafka instance can be used for all test methods. This brings down execution time of this test from ~40sec to ~8sec.
This commit is contained in:
parent
c3b36c2417
commit
bfaadcf2dd
@ -20,7 +20,9 @@
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
@ -38,21 +40,16 @@
|
||||
*/
|
||||
public class KafkaDatabaseHistoryTest {
|
||||
|
||||
private static KafkaCluster kafka;
|
||||
|
||||
private KafkaDatabaseHistory history;
|
||||
private KafkaCluster kafka;
|
||||
private Map<String, String> source;
|
||||
private Map<String, Object> position;
|
||||
private String topicName;
|
||||
private String ddl;
|
||||
|
||||
private static final int PARTITION_NO = 0;
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
source = Collect.hashMapOf("server", "my-server");
|
||||
setLogPosition(0);
|
||||
topicName = "schema-changes-topic";
|
||||
|
||||
@BeforeClass
|
||||
public static void startKafka() throws Exception {
|
||||
File dataDir = Testing.Files.createTestingDirectory("history_cluster");
|
||||
Testing.Files.delete(dataDir);
|
||||
|
||||
@ -65,6 +62,19 @@ public void beforeEach() throws Exception {
|
||||
"auto.create.topics.enable", "false",
|
||||
"zookeeper.session.timeout.ms", "20000"))
|
||||
.startup();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopKafka() {
|
||||
if (kafka != null) {
|
||||
kafka.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
source = Collect.hashMapOf("server", "my-server");
|
||||
setLogPosition(0);
|
||||
history = new KafkaDatabaseHistory();
|
||||
}
|
||||
|
||||
@ -77,25 +87,19 @@ public void afterEach() {
|
||||
}
|
||||
finally {
|
||||
history = null;
|
||||
try {
|
||||
if (kafka != null) {
|
||||
kafka.shutdown();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
kafka = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception {
|
||||
String topicName = "empty-and-recovery-schema-changes";
|
||||
|
||||
// Create the empty topic ...
|
||||
kafka.createTopic(topicName, 1, 1);
|
||||
testHistoryTopicContent(false);
|
||||
testHistoryTopicContent(topicName, false);
|
||||
}
|
||||
|
||||
private void testHistoryTopicContent(boolean skipUnparseableDDL) {
|
||||
private void testHistoryTopicContent(String topicName, boolean skipUnparseableDDL) {
|
||||
// Start up the history ...
|
||||
Configuration config = Configuration.create()
|
||||
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList())
|
||||
@ -142,7 +146,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {
|
||||
|
||||
// Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
|
||||
setLogPosition(10);
|
||||
ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
|
||||
String ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
|
||||
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
|
||||
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
|
||||
history.record(source, position, "db1", ddl);
|
||||
@ -209,6 +213,8 @@ protected void setLogPosition(int index) {
|
||||
|
||||
@Test
|
||||
public void shouldIgnoreUnparseableMessages() throws Exception {
|
||||
String topicName = "ignore-unparseable-schema-changes";
|
||||
|
||||
// Create the empty topic ...
|
||||
kafka.createTopic(topicName, 1, 1);
|
||||
|
||||
@ -242,11 +248,13 @@ public void shouldIgnoreUnparseableMessages() throws Exception {
|
||||
producer.send(invalidSQL).get();
|
||||
}
|
||||
|
||||
testHistoryTopicContent(true);
|
||||
testHistoryTopicContent(topicName, true);
|
||||
}
|
||||
|
||||
@Test(expected = ParsingException.class)
|
||||
public void shouldStopOnUnparseableSQL() throws Exception {
|
||||
String topicName = "stop-on-unparseable-schema-changes";
|
||||
|
||||
// Create the empty topic ...
|
||||
kafka.createTopic(topicName, 1, 1);
|
||||
|
||||
@ -264,13 +272,15 @@ public void shouldStopOnUnparseableSQL() throws Exception {
|
||||
producer.send(invalidSQL).get();
|
||||
}
|
||||
|
||||
testHistoryTopicContent(false);
|
||||
testHistoryTopicContent(topicName, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExists() {
|
||||
String topicName = "exists-schema-changes";
|
||||
|
||||
// happy path
|
||||
testHistoryTopicContent(true);
|
||||
testHistoryTopicContent(topicName, true);
|
||||
assertTrue(history.exists());
|
||||
|
||||
// Set history to use dummy topic
|
||||
@ -303,6 +313,8 @@ public void testExists() {
|
||||
@Test
|
||||
@FixFor("DBZ-1886")
|
||||
public void differentiateStorageExistsFromHistoryExists() {
|
||||
String topicName = "differentiate-storage-exists-schema-changes";
|
||||
|
||||
Configuration config = Configuration.create()
|
||||
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList())
|
||||
.with(KafkaDatabaseHistory.TOPIC, topicName)
|
||||
@ -325,7 +337,7 @@ public void differentiateStorageExistsFromHistoryExists() {
|
||||
assertFalse(history.exists());
|
||||
history.start();
|
||||
setLogPosition(0);
|
||||
ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
|
||||
String ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
|
||||
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
|
||||
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
|
||||
history.record(source, position, "db1", ddl);
|
||||
|
Loading…
Reference in New Issue
Block a user