DBZ-11 Build can skip long-running unit and integration tests

This commit is contained in:
Randall Hauch 2016-02-04 15:35:27 -06:00
parent 64187eb390
commit 1a59f9b07c
8 changed files with 212 additions and 7 deletions

View File

@ -20,4 +20,4 @@ after_install:
- docker ps -a
script:
- mvn clean install
- mvn clean install -DskipLongRunningTests=false

View File

@ -171,6 +171,7 @@
<database.port>${database.port}</database.port>
<database.user>${database.user}</database.user>
<database.password>${database.password}</database.password>
<skipLongRunningTests>${skipLongRunningTests}</skipLongRunningTests>
</systemPropertyVariables>
</configuration>
</plugin>

View File

@ -0,0 +1,30 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation used together with the {@link SkipTestRule} JUnit rule, that allows long running tests to be excluded
* from the build, using the {@code skipLongRunningTests} system property.
*
* @author Horia Chiorean
*/
@Retention( RetentionPolicy.RUNTIME)
@Target( {ElementType.METHOD, ElementType.TYPE})
public @interface SkipLongRunning {
String SKIP_LONG_RUNNING_PROPERTY = "skipLongRunningTests";
/**
* The optional reason why the test is skipped.
* @return the reason why the test is skipped
*/
String value() default "long-running";
}

View File

@ -0,0 +1,54 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation used together with the {@link SkipTestRule} JUnit rule, that allows tests to be excluded
* from the build if they are run on certain platforms.
*
* @author Horia Chiorean
*/
@Retention( RetentionPolicy.RUNTIME)
@Target( { ElementType.METHOD, ElementType.TYPE})
public @interface SkipOnOS {
/**
* Symbolic constant used to determine the Windows operating system, from the "os.name" system property.
*/
String WINDOWS = "windows";
/**
* Symbolic constant used to determine the OS X operating system, from the "os.name" system property.
*/
String MAC = "mac";
/**
* Symbolic constant used to determine the Linux operating system, from the "os.name" system property.
*/
String LINUX = "linux";
/**
* The list of OS names on which the test should be skipped.
*
* @return a list of "symbolic" OS names.
* @see #WINDOWS
* @see #MAC
* @see #LINUX
*/
String[] value();
/**
* An optional description which explains why the test should be skipped.
*
* @return a string which explains the reasons for skipping the test.
*/
String description() default "";
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
import java.lang.annotation.Annotation;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
/**
* JUnit rule that inspects the presence of the {@link SkipLongRunning} annotation either on a test method or on a test suite. If
* it finds the annotation, it will only run the test method/suite if the system property {@code skipLongRunningTests} has the
* value {@code true}
*
* @author Horia Chiorean
*/
public class SkipTestRule implements TestRule {
@Override
public Statement apply( Statement base,
Description description ) {
SkipLongRunning skipLongRunningAnnotation = hasAnnotation(description, SkipLongRunning.class);
if (skipLongRunningAnnotation != null) {
boolean skipLongRunning = Boolean.valueOf(System.getProperty(SkipLongRunning.SKIP_LONG_RUNNING_PROPERTY));
if (skipLongRunning) {
return emptyStatement(skipLongRunningAnnotation.value(), description);
}
}
SkipOnOS skipOnOSAnnotation = hasAnnotation(description, SkipOnOS.class);
if (skipOnOSAnnotation != null) {
String[] oses = skipOnOSAnnotation.value();
String osName = System.getProperty("os.name");
if (osName != null && !osName.trim().isEmpty()) {
for (String os : oses) {
if (osName.toLowerCase().startsWith(os.toLowerCase())) {
return emptyStatement(skipOnOSAnnotation.description(), description);
}
}
}
}
return base;
}
private <T extends Annotation> T hasAnnotation( Description description, Class<T> annotationClass ) {
T annotation = description.getAnnotation(annotationClass);
if (annotation != null) {
return annotation;
} else if (description.isTest() && description.getTestClass().isAnnotationPresent(annotationClass)) {
return description.getTestClass().getAnnotation(annotationClass);
}
return null;
}
private static Statement emptyStatement( final String reason, final Description description ) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
StringBuilder messageBuilder = new StringBuilder(description.testCount());
messageBuilder.append("Skipped ").append(description.toString());
if (reason != null && !reason.trim().isEmpty()) {
messageBuilder.append(" because: ").append(reason);
}
System.out.println(messageBuilder.toString());
}
};
}
}

View File

@ -34,6 +34,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -497,6 +498,7 @@ public Properties getConsumerProperties(String groupId, String clientId, OffsetR
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString());
if (autoOffsetReset != null) {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset.toString().toLowerCase());
}
@ -592,8 +594,9 @@ public <K, V> InteractiveConsumer<K, V> createConsumer(String groupId, String cl
BlockingQueue<ConsumerRecord<K, V>> consumed = new LinkedBlockingQueue<>();
List<ConsumerRecord<K, V>> allMessages = new LinkedList<>();
AtomicBoolean keepReading = new AtomicBoolean();
OffsetCommitCallback offsetCommitCallback = null;
consume(groupId, clientId, OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, () -> keepReading.get(),
completion, topicNames, record -> {
offsetCommitCallback, completion, topicNames, record -> {
consumed.add(record);
allMessages.add(record);
});
@ -847,13 +850,16 @@ public void produceDocuments(String topic, int messageCount,
* @param keyDeserializer the deserializer for the keys; may not be null
* @param valueDeserializer the deserializer for the values; may not be null
* @param continuation the function that determines if the consumer should continue; may not be null
* @param offsetCommitCallback the callback that should be used after committing offsets; may be null if offsets are
* not to be committed
* @param completion the function to call when the consumer terminates; may be null
* @param topics the set of topics to consume; may not be null or empty
* @param consumerFunction the function to consume the messages; may not be null
*/
public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer,
BooleanSupplier continuation, Runnable completion, Collection<String> topics,
BooleanSupplier continuation, OffsetCommitCallback offsetCommitCallback, Runnable completion,
Collection<String> topics,
java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) {
Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
Thread t = new Thread(() -> {
@ -864,7 +870,9 @@ public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy
consumer.poll(10).forEach(record -> {
LOGGER.debug("Consumer {}: consuming message {}", clientId, record);
consumerFunction.accept(record);
consumer.commitAsync();
if (offsetCommitCallback != null) {
consumer.commitAsync(offsetCommitCallback);
}
});
}
} finally {
@ -889,7 +897,9 @@ public void consumeDocuments(BooleanSupplier continuation, Runnable completion,
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<Document> valDes = new DocumentSerdes();
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
OffsetCommitCallback offsetCommitCallback = null;
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, offsetCommitCallback,
completion, topics, consumerFunction);
}
/**
@ -905,7 +915,9 @@ public void consumeStrings(BooleanSupplier continuation, Runnable completion, Co
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<String> valDes = keyDes;
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
OffsetCommitCallback offsetCommitCallback = null;
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, offsetCommitCallback,
completion, topics, consumerFunction);
}
/**
@ -921,7 +933,9 @@ public void consumeIntegers(BooleanSupplier continuation, Runnable completion, C
Deserializer<String> keyDes = new StringDeserializer();
Deserializer<Integer> valDes = new IntegerDeserializer();
String randomId = UUID.randomUUID().toString();
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, completion, topics, consumerFunction);
OffsetCommitCallback offsetCommitCallback = null;
consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, offsetCommitCallback,
completion, topics, consumerFunction);
}
/**

View File

@ -14,10 +14,14 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.junit.SkipLongRunning;
import io.debezium.junit.SkipTestRule;
import io.debezium.util.Stopwatch;
import io.debezium.util.Testing;
@ -25,6 +29,9 @@
* @author Randall Hauch
*/
public class KafkaClusterTest {
@Rule
public TestRule skipTestRule = new SkipTestRule();
private KafkaCluster cluster;
private File dataDir;
@ -43,6 +50,7 @@ public void afterEach() {
}
@Test
@SkipLongRunning
public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception {
cluster.deleteDataUponShutdown(true).addBrokers(1).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
@ -51,6 +59,7 @@ public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception {
}
@Test
@SkipLongRunning
public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception {
cluster.deleteDataUponShutdown(true).addBrokers(3).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
@ -59,6 +68,7 @@ public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception
}
@Test
@SkipLongRunning
public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception {
cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
@ -67,6 +77,7 @@ public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception {
}
@Test
@SkipLongRunning
public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception {
cluster.deleteDataUponShutdown(false).addBrokers(3).startup();
cluster.onEachDirectory(this::assertValidDataDirectory);
@ -75,6 +86,7 @@ public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception
}
@Test
@SkipLongRunning
public void shouldStartClusterAndAllowProducersAndConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";
@ -146,6 +158,7 @@ public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumers
}
@Test
@SkipLongRunning
public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception {
Testing.Debug.enable();
final String topicName = "topicA";

20
pom.xml
View File

@ -85,6 +85,9 @@
<version.scala>2.11.7</version.scala>
<version.curator>2.4.0</version.curator>
<version.zookeeper>3.4.6</version.zookeeper>
<!--Skip long running tests by default-->
<skipLongRunningTests>true</skipLongRunningTests>
</properties>
<modules>
<module>support/checkstyle</module>
@ -435,6 +438,10 @@
<name>java.io.tmpdir</name>
<value>${basedir}/target</value>
</property>
<property>
<name>skipLongRunningTests</name>
<value>${skipLongRunningTests}</value>
</property>
</systemProperties>
<argLine>-Djava.awt.headless=true</argLine>
<!--runOrder>alphabetical</runOrder-->
@ -473,6 +480,19 @@
</plugins>
</build>
<profiles>
<profile>
<id>assembly</id>
<properties>
<skipLongRunningTests>false</skipLongRunningTests>
</properties>
</profile>
<profile>
<id>performance</id>
<properties>
<skipLongRunningTests>false</skipLongRunningTests>
</properties>
</profile>
<profile>
<id>docs</id>
<activation>