From f63fc9135ad7225b2e01b79bab0933e52e328639 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Thu, 15 Aug 2019 17:07:09 -0700 Subject: [PATCH] [DBZ-1406] Replace Yaml Dependency With Property File. --- debezium-connector-cassandra/pom.xml | 5 ---- .../cassandra/CassandraConnectorConfig.java | 20 ++++++------- .../cassandra/CassandraConnectorTask.java | 6 ++-- .../cassandra/network/SslConfig.java | 6 ++-- .../cassandra/network/SslContextFactory.java | 10 ++++--- .../CassandraConnectorConfigTest.java | 11 +++---- .../EmbeddedCassandraConnectorTestBase.java | 30 +++++++++---------- 7 files changed, 44 insertions(+), 44 deletions(-) diff --git a/debezium-connector-cassandra/pom.xml b/debezium-connector-cassandra/pom.xml index 7e859cedb..361de875d 100644 --- a/debezium-connector-cassandra/pom.xml +++ b/debezium-connector-cassandra/pom.xml @@ -51,11 +51,6 @@ metrics-servlets ${version.dropwizard} - - org.yaml - snakeyaml - 1.20 - org.eclipse.jetty jetty-servlet diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java index 749a5c012..aafd2bd21 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorConfig.java @@ -237,6 +237,12 @@ public static Optional fromText(String text) { public static final String LATEST_COMMIT_LOG_ONLY = "latest.commit.log.only"; public static final boolean DEFAULT_LATEST_COMMIT_LOG_ONLY = false; + private Properties configs; + + public CassandraConnectorConfig(Properties configs) { + this.configs = configs; + } + public String connectorName() { return (String) configs.get(CONNECTOR_NAME); } @@ -253,9 +259,9 @@ public Properties getKafkaConfigs() { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); configs.entrySet().stream() - .filter(entry -> entry.getKey().startsWith(KAFKA_PRODUCER_CONFIG_PREFIX)) + .filter(entry -> entry.getKey().toString().startsWith(KAFKA_PRODUCER_CONFIG_PREFIX)) .forEach(entry -> { - String k = entry.getKey().replace(KAFKA_PRODUCER_CONFIG_PREFIX, ""); + String k = entry.getKey().toString().replace(KAFKA_PRODUCER_CONFIG_PREFIX, ""); Object v = entry.getValue(); props.put(k, v); }); @@ -266,7 +272,7 @@ public Properties getKafkaConfigs() { public Properties commitLogTransferConfigs() { Properties props = new Properties(); configs.entrySet().stream() - .filter(entry -> entry.getKey().startsWith(COMMIT_LOG_TRANSFER_CONFIG_PREFIX)) + .filter(entry -> entry.getKey().toString().startsWith(COMMIT_LOG_TRANSFER_CONFIG_PREFIX)) .forEach(entry -> props.put(entry.getKey(), entry.getValue())); return props; } @@ -275,12 +281,6 @@ public boolean latestCommitLogOnly() { return (boolean) configs.getOrDefault(LATEST_COMMIT_LOG_ONLY, DEFAULT_LATEST_COMMIT_LOG_ONLY); } - private Map configs; - - public CassandraConnectorConfig(Map configs) { - this.configs = configs; - } - public SnapshotMode snapshotMode() { String mode = (String) configs.getOrDefault(SNAPSHOT_MODE, DEFAULT_SNAPSHOT_MODE); Optional snapshotModeOpt = SnapshotMode.fromText(mode); @@ -400,7 +400,7 @@ public boolean tombstonesOnDelete() { @Override public String toString() { return configs.entrySet().stream() - .filter(e -> !e.getKey().toLowerCase().contains("username") && !e.getKey().toLowerCase().contains("password")) + .filter(e -> !e.getKey().toString().toLowerCase().contains("username") && !e.getKey().toString().toLowerCase().contains("password")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) .toString(); } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTask.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTask.java index 561bbbc64..d8535d9f6 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTask.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CassandraConnectorTask.java @@ -18,7 +18,6 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; import java.io.FileInputStream; import java.io.IOException; @@ -29,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.Properties; /** * A task that reads Cassandra commit log in CDC directory and generate corresponding data @@ -54,7 +54,9 @@ public static void main(String[] args) throws Exception { String configPath = args[0]; try (FileInputStream fis = new FileInputStream(configPath)) { - Map props = new Yaml().load(fis); + Properties props = new Properties(); + props.load(fis); + fis.close(); CassandraConnectorConfig config = new CassandraConnectorConfig(props); CassandraConnectorTask task = new CassandraConnectorTask(config); task.run(); diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslConfig.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslConfig.java index cbf8f32b2..edb05ee30 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslConfig.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslConfig.java @@ -5,7 +5,7 @@ */ package io.debezium.connector.cassandra.network; -import java.util.Map; +import java.util.Properties; public class SslConfig { @@ -25,9 +25,9 @@ public class SslConfig { public static final String TRUST_MANAGER_ALGORITHM = "trustManager.algorithm"; public static final String DEFAULT_TRUST_MANAGER_ALGORITHM = "SunX509"; - private Map configs; + private Properties configs; - public SslConfig(Map configs) { + public SslConfig(Properties configs) { this.configs = configs; } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslContextFactory.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslContextFactory.java index 989a6bc35..65126f659 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslContextFactory.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/network/SslContextFactory.java @@ -12,7 +12,6 @@ import io.netty.handler.ssl.util.SelfSignedCertificate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; @@ -20,6 +19,7 @@ import java.io.IOException; import java.security.GeneralSecurityException; import java.security.KeyStore; +import java.util.Properties; public class SslContextFactory { private static final Logger LOGGER = LoggerFactory.getLogger(SslContextFactory.class); @@ -27,7 +27,7 @@ private SslContextFactory() { } /** * Return an {@link SslContext} containing all SSL configurations parsed - * from the YAML file path + * from the Properties file path *

* See {@link SslConfig} class for a list of valid config names * @@ -38,9 +38,11 @@ public static SslContext createSslContext(String sslConfigPath) throws GeneralSe if (sslConfigPath == null) { throw new CassandraConnectorConfigException("Please specify SSL config path in cdc.yml"); } - Yaml yaml = new Yaml(); + Properties props = new Properties(); try (FileInputStream fis = new FileInputStream(sslConfigPath)) { - SslConfig sslConfig = new SslConfig(yaml.load(fis)); + props.load(fis); + fis.close(); + SslConfig sslConfig = new SslConfig(props); return createSslContext(sslConfig); } } diff --git a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/CassandraConnectorConfigTest.java b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/CassandraConnectorConfigTest.java index 77051cb1c..45ac2ed1c 100644 --- a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/CassandraConnectorConfigTest.java +++ b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/CassandraConnectorConfigTest.java @@ -7,8 +7,7 @@ import org.junit.Test; -import java.util.Collections; -import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -131,13 +130,15 @@ public void testConfigs() { } private CassandraConnectorConfig buildTaskConfig(String key, Object value) { - Map map = Collections.singletonMap(key, value); - return new CassandraConnectorConfig(map); + Properties props = new Properties(); + props.put(key, value); + return new CassandraConnectorConfig(props); } @Test public void testDefaultConfigs() { - CassandraConnectorConfig config = new CassandraConnectorConfig(Collections.emptyMap()); + Properties props = new Properties(); + CassandraConnectorConfig config = new CassandraConnectorConfig(props); assertEquals(CassandraConnectorConfig.DEFAULT_SNAPSHOT_CONSISTENCY, config.snapshotConsistencyLevel().name().toUpperCase()); assertEquals(CassandraConnectorConfig.DEFAULT_HTTP_PORT, config.httpPort()); assertArrayEquals(CassandraConnectorConfig.DEFAULT_CASSANDRA_HOST.split(","), config.cassandraHosts()); diff --git a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/EmbeddedCassandraConnectorTestBase.java b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/EmbeddedCassandraConnectorTestBase.java index 6991daaa8..f3e45a51d 100644 --- a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/EmbeddedCassandraConnectorTestBase.java +++ b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/EmbeddedCassandraConnectorTestBase.java @@ -23,8 +23,8 @@ import java.nio.file.Paths; import java.security.GeneralSecurityException; import java.time.Duration; -import java.util.HashMap; import java.util.Map; +import java.util.Properties; /** * Base class used to automatically spin up a single-node embedded Cassandra cluster before tests @@ -75,7 +75,7 @@ protected static void deleteTestKeyspaceTables() { * Generate a task context with default test configs */ protected static CassandraConnectorContext generateTaskContext() throws GeneralSecurityException, IOException { - Map defaults = generateDefaultConfigMap(); + Properties defaults = generateDefaultConfigMap(); return new CassandraConnectorContext(new CassandraConnectorConfig(defaults)); } @@ -83,7 +83,7 @@ protected static CassandraConnectorContext generateTaskContext() throws GeneralS * General a task context with default and custom test configs */ protected static CassandraConnectorContext generateTaskContext(Map configs) throws GeneralSecurityException, IOException { - Map defaults = generateDefaultConfigMap(); + Properties defaults = generateDefaultConfigMap(); defaults.putAll(configs); return new CassandraConnectorContext(new CassandraConnectorConfig(defaults)); } @@ -165,18 +165,18 @@ protected static void clearCommitLogFromDirectory(File directory, boolean recurs } } - protected static Map generateDefaultConfigMap() throws IOException { - Map configs = new HashMap<>(); - configs.put(CassandraConnectorConfig.CONNECTOR_NAME, TEST_CONNECTOR_NAME); - configs.put(CassandraConnectorConfig.CASSANDRA_CONFIG, TEST_CASSANDRA_YAML_CONFIG); - configs.put(CassandraConnectorConfig.KAFKA_TOPIC_PREFIX, TEST_KAFKA_TOPIC_PREFIX); - configs.put(CassandraConnectorConfig.CASSANDRA_HOSTS, TEST_CASSANDRA_HOSTS); - configs.put(CassandraConnectorConfig.CASSANDRA_PORT, TEST_CASSANDRA_PORT); - configs.put(CassandraConnectorConfig.OFFSET_BACKING_STORE_DIR, Files.createTempDirectory("offset").toString()); - configs.put(CassandraConnectorConfig.KAFKA_PRODUCER_CONFIG_PREFIX + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TEST_KAFKA_SERVERS); - configs.put(CassandraConnectorConfig.KAFKA_PRODUCER_CONFIG_PREFIX + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_SCHEMA_REGISTRY_URL); - configs.put(CassandraConnectorConfig.COMMIT_LOG_RELOCATION_DIR, Files.createTempDirectory("cdc_raw_relocation").toString()); - return configs; + protected static Properties generateDefaultConfigMap() throws IOException { + Properties props = new Properties(); + props.put(CassandraConnectorConfig.CONNECTOR_NAME, TEST_CONNECTOR_NAME); + props.put(CassandraConnectorConfig.CASSANDRA_CONFIG, TEST_CASSANDRA_YAML_CONFIG); + props.put(CassandraConnectorConfig.KAFKA_TOPIC_PREFIX, TEST_KAFKA_TOPIC_PREFIX); + props.put(CassandraConnectorConfig.CASSANDRA_HOSTS, TEST_CASSANDRA_HOSTS); + props.put(CassandraConnectorConfig.CASSANDRA_PORT, TEST_CASSANDRA_PORT); + props.put(CassandraConnectorConfig.OFFSET_BACKING_STORE_DIR, Files.createTempDirectory("offset").toString()); + props.put(CassandraConnectorConfig.KAFKA_PRODUCER_CONFIG_PREFIX + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TEST_KAFKA_SERVERS); + props.put(CassandraConnectorConfig.KAFKA_PRODUCER_CONFIG_PREFIX + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_SCHEMA_REGISTRY_URL); + props.put(CassandraConnectorConfig.COMMIT_LOG_RELOCATION_DIR, Files.createTempDirectory("cdc_raw_relocation").toString()); + return props; } private static void startEmbeddedCassandra() throws Exception {