diff --git a/CHANGELOG.md b/CHANGELOG.md index e9b9362a8..f1da6a704 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ -## Change log +# Change log All notable changes are documented in this file. Release numbers follow [Semantic Versioning](http://semver.org) -### Unreleased +## Unreleased + +### Added + +* MySQL connector for ingesting change events from MySQL databases ([DBZ-1](https://issues.jboss.org/projects/DBZ/issues/DBZ-1)) +* Simple DDL parsing framework that can be extended and used by various connectors (as part of [DBZ-1](https://issues.jboss.org/projects/DBZ/issues/DBZ-1)) + +### Changed + +### Fixed diff --git a/debezium-core/src/main/java/io/debezium/config/Configuration.java b/debezium-core/src/main/java/io/debezium/config/Configuration.java index 7b6b91f69..6c295da59 100644 --- a/debezium-core/src/main/java/io/debezium/config/Configuration.java +++ b/debezium-core/src/main/java/io/debezium/config/Configuration.java @@ -36,7 +36,9 @@ /** * An immutable representation of a Debezium configuration. A {@link Configuration} instance can be obtained * {@link #from(Properties) from Properties} or loaded from a {@link #load(File) file}, {@link #load(InputStream) stream}, - * {@link #load(Reader) reader}, {@link #load(URL) URL}, or from a {@link #load(String, ClassLoader) resource on the classpath}. + * {@link #load(Reader) reader}, {@link #load(URL) URL}, or {@link #load(String, ClassLoader) classpath resource}. They can + * also be built by first {@link #create() creating a builder} and then using that builder to populate and + * {@link Builder#build() return} the immutable Configuration instance. *

* A Configuration object is basically a decorator around a {@link Properties} object. It has methods to get and convert * individual property values to numeric, boolean and String types, optionally using a default value if the given property value @@ -48,51 +50,6 @@ @Immutable public interface Configuration { - public static Field field(String name, String description) { - return new Field(name, description, null); - } - - public static Field field(String name, String description, String defaultValue) { - return new Field(name, description, defaultValue); - } - - public static Field field(String name, String description, int defaultValue) { - return new Field(name, description, Integer.toString(defaultValue)); - } - - public static Field field(String name, String description, long defaultValue) { - return new Field(name, description, Long.toString(defaultValue)); - } - - public static Field field(String name, String description, boolean defaultValue) { - return new Field(name, description, Boolean.toString(defaultValue)); - } - - public static class Field { - private final String name; - private final String desc; - private final String defaultValue; - - public Field(String name, String description, String defaultValue) { - this.name = name; - this.desc = description; - this.defaultValue = defaultValue; - assert this.name != null; - } - - public String name() { - return name; - } - - public String defaultValue() { - return defaultValue; - } - - public String description() { - return desc; - } - } - /** * The basic interface for configuration builders. * @@ -164,6 +121,70 @@ default B with(String key, boolean value) { return with(key, Boolean.toString(value)); } + /** + * If there is no field with the specified key, then associate the given value with the specified key. + * + * @param key the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + B withDefault(String key, String value); + + /** + * If there is no field with the specified key, then associate the given value with the specified key. + * + * @param key the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(String key, int value) { + return withDefault(key, Integer.toString(value)); + } + + /** + * If there is no field with the specified key, then associate the given value with the specified key. + * + * @param key the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(String key, float value) { + return withDefault(key, Float.toString(value)); + } + + /** + * If there is no field with the specified key, then associate the given value with the specified key. + * + * @param key the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(String key, double value) { + return withDefault(key, Double.toString(value)); + } + + /** + * If there is no field with the specified key, then associate the given value with the specified key. + * + * @param key the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(String key, long value) { + return withDefault(key, Long.toString(value)); + } + + /** + * If there is no field with the specified key, then associate the given value with the specified key. + * + * @param key the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(String key, boolean value) { + return withDefault(key, Boolean.toString(value)); + } + /** * Associate the given value with the key of the specified field. * @@ -172,7 +193,7 @@ default B with(String key, boolean value) { * @return this builder object so methods can be chained together; never null */ default B with(Field field, String value) { - return with(field.name(),value); + return with(field.name(), value); } /** @@ -183,7 +204,7 @@ default B with(Field field, String value) { * @return this builder object so methods can be chained together; never null */ default B with(Field field, int value) { - return with(field.name(),value); + return with(field.name(), value); } /** @@ -194,7 +215,7 @@ default B with(Field field, int value) { * @return this builder object so methods can be chained together; never null */ default B with(Field field, float value) { - return with(field.name(),value); + return with(field.name(), value); } /** @@ -205,7 +226,7 @@ default B with(Field field, float value) { * @return this builder object so methods can be chained together; never null */ default B with(Field field, double value) { - return with(field.name(),value); + return with(field.name(), value); } /** @@ -216,7 +237,7 @@ default B with(Field field, double value) { * @return this builder object so methods can be chained together; never null */ default B with(Field field, long value) { - return with(field.name(),value); + return with(field.name(), value); } /** @@ -227,9 +248,83 @@ default B with(Field field, long value) { * @return this builder object so methods can be chained together; never null */ default B with(Field field, boolean value) { - return with(field.name(),value); + return with(field.name(), value); } - + + /** + * If the field does not have a value, then associate the given value with the key of the specified field. + * + * @param field the predefined field for the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(Field field, String value) { + return withDefault(field.name(), value); + } + + /** + * If the field does not have a value, then associate the given value with the key of the specified field. + * + * @param field the predefined field for the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(Field field, int value) { + return withDefault(field.name(), value); + } + + /** + * If the field does not have a value, then associate the given value with the key of the specified field. + * + * @param field the predefined field for the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(Field field, float value) { + return withDefault(field.name(), value); + } + + /** + * If the field does not have a value, then associate the given value with the key of the specified field. + * + * @param field the predefined field for the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(Field field, double value) { + return withDefault(field.name(), value); + } + + /** + * If the field does not have a value, then associate the given value with the key of the specified field. + * + * @param field the predefined field for the key + * @param value the value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(Field field, long value) { + return withDefault(field.name(), value); + } + + /** + * If the field does not have a value, then associate the given value with the key of the specified field. + * + * @param field the predefined field for the key + * @param value the default value + * @return this builder object so methods can be chained together; never null + */ + default B withDefault(Field field, boolean value) { + return withDefault(field.name(), value); + } + + /** + * Apply the function to this builder. + * + * @param function the predefined field for the key + * @return this builder object so methods can be chained together; never null + */ + B apply(Consumer function); + /** * Build and return the immutable configuration. * @@ -256,6 +351,20 @@ public Builder with(String key, String value) { props.setProperty(key, value); return this; } + + @Override + public Builder withDefault(String key, String value) { + if ( !props.containsKey(key)) { + props.setProperty(key, value); + } + return this; + } + + @Override + public Builder apply(Consumer function) { + function.accept(this); + return this; + } @Override public Configuration build() { @@ -281,7 +390,7 @@ public static Builder create() { public static Builder copy(Configuration config) { return new Builder(config.asProperties()); } - + /** * Create a Configuration object that is populated by system properties, per {@link #withSystemProperties(String)}. * @@ -473,7 +582,15 @@ public static Configuration load(String path, ClassLoader classLoader, Consumer< return from(props); } } - + + /** + * Obtain an editor for a copy of this configuration. + * @return a builder that is populated with this configuration's key-value pairs; never null + */ + default Builder edit() { + return copy(this); + } + /** * Determine whether this configuration contains a key-value pair with the given key and the value is non-null * @@ -536,6 +653,19 @@ default String getString(Field field) { return getString(field.name(), field.defaultValue()); } + /** + * Get the string value associated with the given field, returning the field's default value if there is no such key-value + * pair in this configuration. + * + * @param field the field; may not be null + * @param defaultValue the default value + * @return the configuration's value for the field, or the field's {@link Field#defaultValue() default value} if there is no + * such key-value pair in the configuration + */ + default String getString(Field field, String defaultValue) { + return getString(field.name(), ()->field.defaultValue()); + } + /** * Get the string value(s) associated with the given key, where the supplied regular expression is used to parse the single * string value into multiple values. @@ -693,9 +823,10 @@ default Boolean getBoolean(String key, BooleanSupplier defaultValueSupplier) { * @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is * no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in * the configuration but the value could not be parsed as an integer value + * @throws NumberFormatException if there is no name-value pair and the field has no default value */ default int getInteger(Field field) { - return getInteger(field.name(), Integer.valueOf(field.defaultValue())); + return getInteger(field.name(), ()->Integer.valueOf(field.defaultValue())).intValue(); } /** @@ -706,9 +837,52 @@ default int getInteger(Field field) { * @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is * no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in * the configuration but the value could not be parsed as a long value + * @throws NumberFormatException if there is no name-value pair and the field has no default value */ default long getLong(Field field) { - return getLong(field.name(), Long.valueOf(field.defaultValue())); + return getLong(field.name(), ()->Long.valueOf(field.defaultValue())).longValue(); + } + + /** + * Get the boolean value associated with the given field when that field has a default value. If the configuration does + * not have a name-value pair with the same name as the field, then the field's default value. + * + * @param field the field + * @return the boolean value, or null if the key is null, there is no such key-value pair in the configuration and there is + * no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in + * the configuration but the value could not be parsed as a boolean value + * @throws NumberFormatException if there is no name-value pair and the field has no default value + */ + default boolean getBoolean(Field field) { + return getBoolean(field.name(), ()->Boolean.valueOf(field.defaultValue())).booleanValue(); + } + + /** + * Get the integer value associated with the given field, returning the field's default value if there is no such + * key-value pair. + * + * @param field the field + * @param defaultValue the default value + * @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is + * no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in + * the configuration but the value could not be parsed as an integer value + */ + default int getInteger(Field field, int defaultValue) { + return getInteger(field.name(), defaultValue); + } + + /** + * Get the long value associated with the given field, returning the field's default value if there is no such + * key-value pair. + * + * @param field the field + * @param defaultValue the default value + * @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is + * no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in + * the configuration but the value could not be parsed as a long value + */ + default long getLong(Field field, long defaultValue) { + return getLong(field.name(), defaultValue); } /** @@ -716,12 +890,13 @@ default long getLong(Field field) { * key-value pair. * * @param field the field + * @param defaultValue the default value * @return the boolean value, or null if the key is null, there is no such key-value pair in the configuration and there is * no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in * the configuration but the value could not be parsed as a boolean value */ - default boolean getBoolean(Field field) { - return getBoolean(field.name(), Boolean.valueOf(field.defaultValue())); + default boolean getBoolean(Field field, boolean defaultValue) { + return getBoolean(field.name(), defaultValue); } /** diff --git a/debezium-core/src/main/java/io/debezium/config/Field.java b/debezium-core/src/main/java/io/debezium/config/Field.java new file mode 100644 index 000000000..f413ff01e --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/config/Field.java @@ -0,0 +1,180 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.config; + +import java.util.Objects; + +import io.debezium.annotation.Immutable; + +/** + * An immutable definition of a field that make appear within a {@link Configuration} instance. + * + * @author Randall Hauch + */ +@Immutable +public final class Field { + /** + * Create an immutable {@link Field} instance with the given property name and description. + * + * @param name the name of the field; may not be null + * @param description the description + * @return the field; never null + */ + public static Field create(String name, String description) { + return new Field(name, description, null); + } + + /** + * Create an immutable {@link Field} instance with the given property name, description, and default value. + * + * @param name the name of the field; may not be null + * @param description the description + * @param defaultValue the default value for the field + * @return the field; never null + */ + public static Field create(String name, String description, String defaultValue) { + return new Field(name, description, defaultValue); + } + + /** + * Create an immutable {@link Field} instance with the given property name, description, and default value. + * + * @param name the name of the field; may not be null + * @param description the description + * @param defaultValue the default value for the field + * @return the field; never null + */ + public static Field create(String name, String description, int defaultValue) { + return new Field(name, description, Integer.toString(defaultValue)); + } + + /** + * Create an immutable {@link Field} instance with the given property name, description, and default value. + * + * @param name the name of the field; may not be null + * @param description the description + * @param defaultValue the default value for the field + * @return the field; never null + */ + public static Field create(String name, String description, long defaultValue) { + return new Field(name, description, Long.toString(defaultValue)); + } + + /** + * Create an immutable {@link Field} instance with the given property name, description, and default value. + * + * @param name the name of the field; may not be null + * @param description the description + * @param defaultValue the default value for the field + * @return the field; never null + */ + public static Field create(String name, String description, boolean defaultValue) { + return new Field(name, description, Boolean.toString(defaultValue)); + } + + private final String name; + private final String desc; + private final String defaultValue; + + protected Field(String name, String description, String defaultValue) { + Objects.requireNonNull(name, "The field name is required"); + this.name = name; + this.desc = description; + this.defaultValue = defaultValue; + assert this.name != null; + } + + /** + * Get the name of the field. + * + * @return the name; never null + */ + public String name() { + return name; + } + + /** + * Get the default value of the field. + * + * @return the default value as a string; never null + */ + public String defaultValue() { + return defaultValue; + } + + /** + * Get the description of the field. + * + * @return the description; never null + */ + public String description() { + return desc; + } + + /** + * Create and return a new Field instance that has the same name and description as this instance, but with the given + * default value. + * + * @param defaultValue the new default value for the new field + * @return the new field; never null + */ + public Field withDefault(String defaultValue) { + return Field.create(name(), description(), defaultValue); + } + + /** + * Create and return a new Field instance that has the same name and description as this instance, but with the given + * default value. + * + * @param defaultValue the new default value for the new field + * @return the new field; never null + */ + public Field withDefault(boolean defaultValue) { + return Field.create(name(), description(), defaultValue); + } + + /** + * Create and return a new Field instance that has the same name and description as this instance, but with the given + * default value. + * + * @param defaultValue the new default value for the new field + * @return the new field; never null + */ + public Field withDefault(int defaultValue) { + return Field.create(name(), description(), defaultValue); + } + + /** + * Create and return a new Field instance that has the same name and description as this instance, but with the given + * default value. + * + * @param defaultValue the new default value for the new field + * @return the new field; never null + */ + public Field withDefault(long defaultValue) { + return Field.create(name(), description(), defaultValue); + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if ( obj == this ) return true; + if ( obj instanceof Field ) { + Field that = (Field)obj; + return this.name().equals(that.name()); + } + return false; + } + + @Override + public String toString() { + return name(); + } +} \ No newline at end of file diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java index e05663621..b5a0a3038 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java @@ -7,35 +7,53 @@ import java.util.Properties; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import io.debezium.annotation.Immutable; import io.debezium.config.Configuration; +import io.debezium.config.Field; import io.debezium.util.Collect; /** - * A specialized configuration for the Debezium driver. + * A specialized configuration for the Debezium driver. This defines several known {@link io.debezium.config.Field + * fields} that are common to all JDBC configurations. * * @author Randall Hauch */ @Immutable public interface JdbcConfiguration extends Configuration { - public static final Field DATABASE = Configuration.field("dbname", - "Name of the database"); - public static final Field USER = Configuration.field("user", - "Name of the database user to be used when connecting to the database"); - public static final Field PASSWORD = Configuration.field("password", - "Password to be used when connecting to the database"); - public static final Field HOSTNAME = Configuration.field("hostname", "IP address of the database"); - public static final Field PORT = Configuration.field("port", "Port of the database", 5432); + /** + * A field for the name of the database. This field has no default value. + */ + public static final Field DATABASE = Field.create("dbname", + "Name of the database"); + /** + * A field for the user of the database. This field has no default value. + */ + public static final Field USER = Field.create("user", + "Name of the database user to be used when connecting to the database"); + /** + * A field for the password of the database. This field has no default value. + */ + public static final Field PASSWORD = Field.create("password", + "Password to be used when connecting to the database"); + /** + * A field for the hostname of the database server. This field has no default value. + */ + public static final Field HOSTNAME = Field.create("hostname", "IP address of the database"); + /** + * A field for the port of the database server. There is no default value. + */ + public static final Field PORT = Field.create("port", "Port of the database"); /** - * The set of pre-defined fields for JDBC configurations. + * The set of names of the pre-defined JDBC configuration fields, including {@link #DATABASE}, {@link #USER}, + * {@link #PASSWORD}, {@link #HOSTNAME}, and {@link #PORT}. */ public static Set ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT); - /** * Obtain a {@link JdbcConfiguration} adapter for the given {@link Configuration}. * @@ -136,6 +154,20 @@ public Builder with(String key, String value) { return this; } + @Override + public Builder withDefault(String key, String value) { + if (!props.containsKey(key)) { + props.setProperty(key, value); + } + return this; + } + + @Override + public Builder apply(Consumer function) { + function.accept(this); + return this; + } + @Override public JdbcConfiguration build() { return JdbcConfiguration.adapt(Configuration.from(props)); @@ -163,6 +195,20 @@ public Builder with(String key, String value) { return this; } + @Override + public Builder withDefault(String key, String value) { + if (!props.containsKey(key)) { + props.setProperty(key, value); + } + return this; + } + + @Override + public Builder apply(Consumer function) { + function.accept(this); + return this; + } + @Override public JdbcConfiguration build() { return JdbcConfiguration.adapt(Configuration.from(props)); diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 582806c36..9cab7802d 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -14,7 +14,9 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -25,6 +27,7 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.config.Configuration; +import io.debezium.config.Field; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; import io.debezium.relational.TableEditor; @@ -85,18 +88,20 @@ public static interface Operations { * * * @param urlPattern the URL pattern string; may not be null + * @param variables any custom or overridden configuration variables * @return the connection factory */ - protected static ConnectionFactory patternBasedFactory(String urlPattern) { + protected static ConnectionFactory patternBasedFactory(String urlPattern, Field... variables) { return (config) -> { LOGGER.trace("Config: {}", config.asProperties()); Properties props = config.asProperties(); - String url = findAndReplace(urlPattern, props, - JdbcConfiguration.HOSTNAME, - JdbcConfiguration.PORT, - JdbcConfiguration.USER, - JdbcConfiguration.PASSWORD, - JdbcConfiguration.DATABASE); + Field[] varsWithDefaults = combineVariables(variables, + JdbcConfiguration.HOSTNAME, + JdbcConfiguration.PORT, + JdbcConfiguration.USER, + JdbcConfiguration.PASSWORD, + JdbcConfiguration.DATABASE); + String url = findAndReplace(urlPattern, props, varsWithDefaults); LOGGER.trace("Props: {}", props); LOGGER.trace("URL: {}", url); Connection conn = DriverManager.getConnection(url, props); @@ -105,13 +110,29 @@ protected static ConnectionFactory patternBasedFactory(String urlPattern) { }; } - private static String findAndReplace(String url, Properties props, Configuration.Field... variables) { - for (Configuration.Field field : variables ) { + private static Field[] combineVariables(Field[] overriddenVariables, + Field... defaultVariables) { + Map fields = new HashMap<>(); + if (defaultVariables != null) { + for (Field variable : defaultVariables) { + fields.put(variable.name(), variable); + } + } + if (overriddenVariables != null) { + for (Field variable : overriddenVariables) { + fields.put(variable.name(), variable); + } + } + return fields.values().toArray(new Field[fields.size()]); + } + + private static String findAndReplace(String url, Properties props, Field... variables) { + for (Field field : variables) { String variable = field.name(); if (variable != null && url.contains("${" + variable + "}")) { // Otherwise, we have to remove it from the properties ... String value = props.getProperty(variable); - if ( value != null ) { + if (value != null) { props.remove(variable); // And replace the variable ... url = url.replaceAll("\\$\\{" + variable + "\\}", value); @@ -145,12 +166,34 @@ public JdbcConnection(Configuration config, ConnectionFactory connectionFactory) * @param initialOperations the initial operations that should be run on each new connection; may be null */ public JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Operations initialOperations) { - this.config = config; + this(config,connectionFactory,initialOperations,null); + } + + /** + * Create a new instance with the given configuration and connection factory, and specify the operations that should be + * run against each newly-established connection. + * + * @param config the configuration; may not be null + * @param connectionFactory the connection factory; may not be null + * @param initialOperations the initial operations that should be run on each new connection; may be null + * @param adapter the function that can be called to update the configuration with defaults + */ + protected JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Operations initialOperations, Consumer adapter) { + this.config = adapter == null ? config : config.edit().apply(adapter).build(); this.factory = connectionFactory; this.initialOps = initialOperations; this.conn = null; } + /** + * Obtain the configuration for this connection. + * + * @return the JDBC configuration; never null + */ + public JdbcConfiguration config() { + return JdbcConfiguration.adapt(config); + } + /** * Ensure a connection to the database is established. * @@ -214,23 +257,23 @@ public JdbcConnection query(String query, Consumer resultConsumer) th } return this; } - - public void print(ResultSet resultSet ) { + + public void print(ResultSet resultSet) { // CHECKSTYLE:OFF - print(resultSet,System.out::println); + print(resultSet, System.out::println); // CHECKSTYLE:ON } - - public void print(ResultSet resultSet, Consumer lines ) { + + public void print(ResultSet resultSet, Consumer lines) { try { ResultSetMetaData rsmd = resultSet.getMetaData(); int columnCount = rsmd.getColumnCount(); int[] columnSizes = findMaxLength(resultSet); lines.accept(delimiter(columnCount, columnSizes)); StringBuilder sb = new StringBuilder(); - for ( int i=1; i<=columnCount; i++ ) { + for (int i = 1; i <= columnCount; i++) { if (i > 1) sb.append(" | "); - sb.append(Strings.setLength(rsmd.getColumnLabel(i),columnSizes[i],' ')); + sb.append(Strings.setLength(rsmd.getColumnLabel(i), columnSizes[i], ' ')); } lines.accept(sb.toString()); sb.setLength(0); @@ -239,7 +282,7 @@ public void print(ResultSet resultSet, Consumer lines ) { sb.setLength(0); for (int i = 1; i <= columnCount; i++) { if (i > 1) sb.append(" | "); - sb.append(Strings.setLength(resultSet.getString(i),columnSizes[i],' ')); + sb.append(Strings.setLength(resultSet.getString(i), columnSizes[i], ' ')); } lines.accept(sb.toString()); sb.setLength(0); @@ -249,27 +292,27 @@ public void print(ResultSet resultSet, Consumer lines ) { throw new RuntimeException(e); } } - - private String delimiter( int columnCount, int[] columnSizes ) { + + private String delimiter(int columnCount, int[] columnSizes) { StringBuilder sb = new StringBuilder(); - for ( int i=1; i<=columnCount; i++ ) { + for (int i = 1; i <= columnCount; i++) { if (i > 1) sb.append("---"); - sb.append(Strings.createString('-',columnSizes[i])); + sb.append(Strings.createString('-', columnSizes[i])); } return sb.toString(); } - - private int[] findMaxLength( ResultSet resultSet ) throws SQLException { + + private int[] findMaxLength(ResultSet resultSet) throws SQLException { ResultSetMetaData rsmd = resultSet.getMetaData(); int columnCount = rsmd.getColumnCount(); - int[] columnSizes = new int[columnCount+1]; - for ( int i=1; i<=columnCount; i++ ) { + int[] columnSizes = new int[columnCount + 1]; + for (int i = 1; i <= columnCount; i++) { columnSizes[i] = Math.max(columnSizes[i], rsmd.getColumnLabel(i).length()); } while (resultSet.next()) { for (int i = 1; i <= columnCount; i++) { String value = resultSet.getString(i); - if ( value != null ) columnSizes[i] = Math.max(columnSizes[i], value.length()); + if (value != null) columnSizes[i] = Math.max(columnSizes[i], value.length()); } } resultSet.beforeFirst(); @@ -299,7 +342,7 @@ public synchronized void close() throws SQLException { } } } - + /** * Create definitions for each tables in the database, given the catalog name, schema pattern, table filter, and * column filter. @@ -375,7 +418,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP */ public static void columnsFor(ResultSet resultSet, TableEditor editor) throws SQLException { List columns = new ArrayList<>(); - columnsFor(resultSet,columns::add); + columnsFor(resultSet, columns::add); editor.setColumns(columns); } @@ -407,6 +450,4 @@ private static boolean isNullable(int jdbcNullable) { return jdbcNullable == ResultSetMetaData.columnNullable || jdbcNullable == ResultSetMetaData.columnNullableUnknown; } - - } diff --git a/debezium-core/src/main/java/io/debezium/relational/TableImpl.java b/debezium-core/src/main/java/io/debezium/relational/TableImpl.java index d88b996a2..666b2574c 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableImpl.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableImpl.java @@ -61,6 +61,23 @@ public List columns() { public Column columnWithName(String name) { return columnsByLowercaseName.get(name.toLowerCase()); } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if ( obj== this) return true; + if ( obj instanceof Table ) { + Table that = (Table)obj; + return this.id().equals(that.id()) + && this.columns().equals(that.columns()) + && this.primaryKeyColumnNames().equals(that.primaryKeyColumnNames()); + } + return false; + } @Override public String toString() { @@ -68,7 +85,7 @@ public String toString() { toString(sb, ""); return sb.toString(); } - + protected void toString(StringBuilder sb, String prefix) { if (prefix == null) prefix = ""; sb.append(prefix).append("columns: {").append(System.lineSeparator()); diff --git a/debezium-core/src/main/java/io/debezium/relational/Tables.java b/debezium-core/src/main/java/io/debezium/relational/Tables.java index 82dae76ee..6d0926d75 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Tables.java +++ b/debezium-core/src/main/java/io/debezium/relational/Tables.java @@ -6,11 +6,11 @@ package io.debezium.relational; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.apache.kafka.connect.data.Schema; @@ -65,7 +65,7 @@ public static interface ColumnFilter { } private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); - private final Map tablesByTableId = new HashMap<>(); + private final Map tablesByTableId = new ConcurrentHashMap<>(); private final Set changes = new HashSet<>(); /** @@ -298,7 +298,22 @@ public TableEditor editOrCreateTable(TableId tableId) { public TableEditor editOrCreateTable(String catalogName, String schemaName, String tableName) { return editOrCreateTable(new TableId(catalogName, schemaName, tableName)); } + + @Override + public int hashCode() { + return tablesByTableId.hashCode(); + } + @Override + public boolean equals(Object obj) { + if ( obj == this ) return true; + if ( obj instanceof Tables ) { + Tables that = (Tables)obj; + return this.tablesByTableId.equals(that.tablesByTableId); + } + return false; + } + @Override public String toString() { return lock.read(() -> { diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java index be84168a0..8d9f3f9db 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java @@ -575,7 +575,7 @@ protected void parseAlterTable(Marker start) { // Update the table ... Column newColumnDefn = column.create(); - table.setColumns(newColumnDefn); + table.addColumn(newColumnDefn); if (isPrimaryKey.get()) { table.setPrimaryKeyNames(newColumnDefn.name()); } diff --git a/debezium-core/src/main/java/io/debezium/text/TokenStream.java b/debezium-core/src/main/java/io/debezium/text/TokenStream.java index ee9faca78..b85a78bd9 100644 --- a/debezium-core/src/main/java/io/debezium/text/TokenStream.java +++ b/debezium-core/src/main/java/io/debezium/text/TokenStream.java @@ -337,6 +337,7 @@ * {@link Tokenizer}s with exactly this behavior can actually be created using the {@link #basicTokenizer(boolean)} method. So * while this very basic implementation is not meant to be used in all situations, it may be useful in some situations. *

+ * * @author Randall Hauch * @author Horia Chiorean * @author Daniel Kelleher diff --git a/debezium-core/src/main/java/io/debezium/util/Collect.java b/debezium-core/src/main/java/io/debezium/util/Collect.java index 2abfddcc9..070f443a0 100644 --- a/debezium-core/src/main/java/io/debezium/util/Collect.java +++ b/debezium-core/src/main/java/io/debezium/util/Collect.java @@ -140,6 +140,37 @@ public static Map hashMapOf(K key1, V value1, K key2, V value2, K k return map; } + + public static Map linkMapOf(K key, V value) { + Map map = new LinkedHashMap<>(); + map.put(key, value); + return map; + } + + public static Map linkMapOf(K key1, V value1, K key2, V value2) { + Map map = new LinkedHashMap<>(); + map.put(key1, value1); + map.put(key2, value2); + return map; + } + + public static Map linkMapOf(K key1, V value1, K key2, V value2, K key3, V value3) { + Map map = new LinkedHashMap<>(); + map.put(key1, value1); + map.put(key2, value2); + map.put(key3, value3); + return map; + } + + public static Map linkMapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) { + Map map = new LinkedHashMap<>(); + map.put(key1, value1); + map.put(key2, value2); + map.put(key3, value3); + map.put(key4, value4); + return map; + } + /** * Set the value at the given position in the list, expanding the list as required to accommodate the new position. *

diff --git a/debezium-core/src/test/java/io/debezium/jdbc/TestDatabase.java b/debezium-core/src/test/java/io/debezium/jdbc/TestDatabase.java deleted file mode 100644 index b1998d6fb..000000000 --- a/debezium-core/src/test/java/io/debezium/jdbc/TestDatabase.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.jdbc; - -import io.debezium.config.Configuration; - -public class TestDatabase { - - public static JdbcConfiguration testConfig( String databaseName ) { - return buildTestConfig().withDatabase(databaseName).build(); - } - - public static JdbcConfiguration.Builder buildTestConfig() { - return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")); - } -} diff --git a/debezium-ingest-mysql/README.md b/debezium-ingest-mysql/README.md index 1b376b79f..98e630daa 100644 --- a/debezium-ingest-mysql/README.md +++ b/debezium-ingest-mysql/README.md @@ -1,15 +1,30 @@ -## Ingesting MySQL +# Ingesting MySQL change events + +This module defines the connector that ingests change events from MySQL databases. + +## Using the MySQL connector with Kafka Connect + +The MySQL connector is designed to work with [Kafka Connect](http://kafka.apache.org/documentation.html#connect) and to be deployed to a Kafka Connect runtime service. The deployed connector will monitor one or more databases and write all change events to Kafka topics, which can be independently consumed by one or more clients. Kafka Connect can be distributed to provide fault tolerance to ensure the connectors are running and continually keeping up with changes in the database. + +Kafka Connect can also be run standalone as a single process, although doing so is not tolerant of failures. + +## Embedding the MySQL connector + +The MySQL connector can also be used as a library without Kafka or Kafka Connect, enabling applications and services to directly connect to a MySQL database and obtain the ordered change events. This approach requires the application to record the progress of the connector so that upon restart the connect can continue where it left off. Therefore, this may be a useful approach for less critical use cases. For production use cases, we highly recommend using this connector with Kafka and Kafka Connect. -## Unit and integration tests + + + +## Testing This module contains both unit tests and integration tests. A *unit test* is a JUnit test class named `*Test.java` or `Test*.java` that never requires or uses external services, though it can use the file system and can run any components within the same JVM process. They should run very quickly, be independent of each other, and clean up after itself. -An *integration test* is a JUnit test class named `*IT.java` or `IT*.java` that uses one or more MySQL databases running in a custom Docker container automatically started before the integration tests are run and automatically stopped and removed after all of the integration tests complete (regardless of whether they suceed or fail). All databases used in the integration tests are defined and populated using `*.sql` files and `*.sh` scripts in the `src/test/docker` directory, which are copied into the Docker image and run (in lexicographical order) by MySQL upon startup. Multiple test methods within a single integration test class can reuse the same database, but generally each integration test class should use its own dedicated database(s). +An *integration test* is a JUnit test class named `*IT.java` or `IT*.java` that uses a MySQL database server running in a custom Docker container. The build will automatically start the MySQL container before the integration tests are run and automatically stop and remove it after all of the integration tests complete (regardless of whether they suceed or fail). All databases used in the integration tests are defined and populated using `*.sql` files and `*.sh` scripts in the `src/test/docker/init` directory, which are copied into the Docker image and run in lexicographical order by MySQL upon startup. Multiple test methods within a single integration test class can reuse the same database, but generally each integration test class should use its own dedicated database(s). -Running `mvn install` will compile all code and run the unit tests. If there are any problems, such as failing unit tests, the build will stop immediately. Otherwise, the build will create the module's artifacts, create the Docker image with MySQL, start the Docker container, run the integration tests, and stop the container even if there are integration test failures. If there are no problems, the build will end by installing the artifacts into the local Maven repository. +Running `mvn install` will compile all code and run the unit tests. If there are any compile problems or any of the unit tests fail, the build will stop immediately. Otherwise, the command will continue to create the module's artifacts, create the Docker image with MySQL and custom scripts, start the Docker container, run the integration tests, stop the container (even if there are integration test failures), and run checkstyle on the code. If there are still no problems, the build will then install the module's artifacts into the local Maven repository. You should always default to using `mvn install`, especially prior to committing changes to Git. However, there are a few situations where you may want to run a different Maven command. @@ -23,26 +38,38 @@ Of course, wildcards also work: $ mvn -Dit.test=Connect*IT install +These commands will automatically manage the MySQL Docker container. + ### Debugging tests -Normally, the MySQL Docker container is stopped and removed after the integration tests are run. One way to debug tests is to configure the build to wait for a remote debugging client, but then you also have to set up your IDE to connect. It's often far easier to debug a single test directly from within your IDE. To do that, you want to start the MySQL Docker container and keep it running: +If you want to debug integration tests by stepping through them in your IDE, using the `mvn install` command will be problematic since it will not wait for your IDE's breakpoints. There are ways of doing this, but it is typically far easier to simply start the Docker container and leave it running so that it is available when you run the integration test(s). To create and start the Docker container, simply run: $ mvn docker:start -Then use your IDE to run one or more unit tests, optionally debugging them as needed. Just be sure that the unit tests clean up their database before (and after) each test. +Again, the container and database server will be initialized as usual but will continue to run. Now you can use your IDE to run/debug one or more integration tests. Just be sure that the integration tests clean up their database before (and after) each test, and that you run the tests with VM arguments that define the required system properties, including: -To stop the container, simply use Docker to stop and remove the MySQL Docker container named `database`: +* `database.dbname` - the name of the database that your integration test will use; there is no default +* `database.hostname` - the IP address or name of the host where the Docker container is running; defaults to `localhost` which is likely for Linux, but on OS X and Windows Docker it will have to be set to the IP address of the VM that runs Docker (which you can find by looking at the `DOCKER_HOST` environment variable). +* `database.port` - the port on which MySQL is listening; defaults to `3306` and is what this module's Docker container uses +* `database.user` - the name of the database user; defaults to `mysql` and is correct unless your database script uses something different +* `database.password` - the password of the database user; defaults to `mysqlpw` and is correct unless your database script uses something different + +For example, you can define these properties by passing these arguments to the VM: + + -Ddatabase.dbname= -Ddatabase.hostname= -Ddatabase.port=3306 -Ddatabase.user=mysqluser -Ddatabase.password=mysqlpw + +When you are finished running the integration tests from your IDE, you have to stop and remove the Docker container (conveniently named "database") before you can run the next build: $ docker stop database $ docker rm database ### Analyzing the database -Sometimes you may want to inspect the state of the database(s) after one or more integration tests are run. The `mvn install` command runs the tests but shuts down and removes the container after the tests complete. To keep the container running after the tests complete, use this Maven command: +Sometimes you may want to inspect the state of the database(s) after one or more integration tests are run. The `mvn install` command runs the tests but shuts down and removes the container after the integration tests complete. To keep the container running after the integration tests complete, use this Maven command: $ mvn integration-test -This instructs Maven to run the normal Maven lifecycle through `integration-test`, and to stop before the `post-integration-test` phase when the Docker container is normally shut down and removed. Be aware that you will need to manually stop and remove the container before running the build again, and to make this more convenient we give the MySQL container the alias `database`: +This instructs Maven to run the normal Maven lifecycle through `integration-test`, and to stop before the `post-integration-test` phase when the Docker container is normally shut down and removed. Be aware that you will need to manually stop and remove the container (conveniently named "database") before running the build again: $ docker stop database $ docker rm database diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/Module.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/Module.java similarity index 93% rename from debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/Module.java rename to debezium-ingest-mysql/src/main/java/io/debezium/mysql/Module.java index 80e47372b..d7da70d1f 100644 --- a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/Module.java +++ b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/Module.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.mysql.ingest; +package io.debezium.mysql; import java.util.Properties; diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/MySqlConfiguration.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/MySqlConfiguration.java deleted file mode 100644 index 4427cf743..000000000 --- a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/MySqlConfiguration.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.mysql; - -import io.debezium.config.Configuration; -import io.debezium.config.Configuration.Field; - -/** - * The configuration properties. - */ -public class MySqlConfiguration { - - public static final Field USER = Configuration.field("database.user", - "Name of the database user to be used when connecting to the database"); - public static final Field PASSWORD = Configuration.field("database.password", - "Password to be used when connecting to the database"); - public static final Field HOSTNAME = Configuration.field("database.hostname", "IP address of the database"); - public static final Field PORT = Configuration.field("database.port", "Port of the database", 5432); - public static final Field SERVER_ID = Configuration.field("connect.id", - "ID of this database client, which must be unique across all database processes in the cluster."); - public static final Field CONNECTION_TIMEOUT_MS = Configuration.field("connect.timeout.ms", - "Maximum time in milliseconds to wait after trying to connect to the database before timing out.", - 30 * 1000); - public static final Field KEEP_ALIVE = Configuration.field("connect.keep.alive", - "Whether a separate thread should be used to ensure the connection is kept alive.", - true); - public static final Field MAX_QUEUE_SIZE = Configuration.field("max.queue.size", - "Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Should be larger than the maximum batch size.", - 2048); - public static final Field MAX_BATCH_SIZE = Configuration.field("max.batch.size", "Maximum size of each batch of source records.", - 1024); - public static final Field POLL_INTERVAL_MS = Configuration.field("poll.interval.ms", - "Frequency in milliseconds to poll for new change events", 1 * 1000); - public static final Field LOGICAL_ID = Configuration.field("database.logical.id", - "Logical unique identifier for this database. Defaults to host:port"); -} diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/TopicSelector.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/TopicSelector.java deleted file mode 100644 index e8f5c3f90..000000000 --- a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/TopicSelector.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.mysql.ingest; - -/** - * A function that determines the name of a topic given the table name and database name. - * - * @author Randall Hauch - */ -@FunctionalInterface -public interface TopicSelector { - /** - * Get the default topic selector logic, which simply concatenates the database name and topic name using a '.' delimiter - * character. - * - * @return the topic selector; never null - */ - static TopicSelector defaultSelector() { - return defaultSelector("."); - } - - /** - * Get the default topic selector logic, which simply concatenates the database name and topic name using the supplied - * delimiter. - * - * @param delimiter the string delineating the database name and table name; may not be null - * @return the topic selector; never null - */ - static TopicSelector defaultSelector(String delimiter) { - return (databaseName, tableName) -> databaseName + delimiter + tableName; - } - - /** - * Get the name of the topic given the database and table names. - * @param databaseName the name of the database; may not be null - * @param tableName the name of the table; may not be null - * @return the topic name; never null - */ - String getTopic(String databaseName, String tableName); -} diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/MySqlConnector.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/Connector.java similarity index 83% rename from debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/MySqlConnector.java rename to debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/Connector.java index b5211723a..65359d585 100644 --- a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/MySqlConnector.java +++ b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/Connector.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.mysql.ingest; +package io.debezium.mysql.source; import java.util.List; import java.util.Map; @@ -11,15 +11,17 @@ import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import io.debezium.mysql.Module; + /** * A Kafka Connect source connector that creates tasks that read the MySQL binary log and generate the corresponding * data change events. * * @author Randall Hauch */ -public class MySqlConnector extends SourceConnector { +public class Connector extends SourceConnector { - public MySqlConnector() { + public Connector() { } @Override @@ -33,7 +35,7 @@ public void start(Map props) { @Override public Class taskClass() { - return null; + return LogReader.class; } @Override diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/ConnectorConfig.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/ConnectorConfig.java new file mode 100644 index 000000000..479e1f5fc --- /dev/null +++ b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/ConnectorConfig.java @@ -0,0 +1,38 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.mysql.source; + +import io.debezium.config.Field; + +/** + * The configuration properties. + */ +public class ConnectorConfig { + + public static final Field USER = Field.create("database.user", + "Name of the database user to be used when connecting to the database."); + public static final Field PASSWORD = Field.create("database.password", + "Password to be used when connecting to the database."); + public static final Field HOSTNAME = Field.create("database.hostname", "IP address of the MySQL database server."); + public static final Field PORT = Field.create("database.port", "Port of the MySQL database server.", 3306); + public static final Field SERVER_ID = Field.create("database.server.id", + "A numeric ID of this database client, which must be unique across all currently-running database processes in the cluster. This is required because this connector essentially joins the MySQL database cluster as another server (with this unique ID) so it can read the binlog."); + public static final Field SERVER_NAME = Field.create("database.server.name", + "A unique name that identifies the database server that this connector monitors. Each database server should be monitored by at most one Debezium connector, since this server name delineates all persisted data eminating from this server. Defaults to 'host:port'"); + public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms", + "Maximum time in milliseconds to wait after trying to connect to the database before timing out.", + 30 * 1000); + public static final Field KEEP_ALIVE = Field.create("connect.keep.alive", + "Whether a separate thread should be used to ensure the connection is kept alive.", + true); + public static final Field MAX_QUEUE_SIZE = Field.create("max.queue.size", + "Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 2048, and should always be larger than the maximum batch size.", + 2048); + public static final Field MAX_BATCH_SIZE = Field.create("max.batch.size", "Maximum size of each batch of source records. Defaults to 1024.", + 1024); + public static final Field POLL_INTERVAL_MS = Field.create("poll.interval.ms", + "Frequency in milliseconds to poll for new change events.", 1 * 1000); +} diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/MySqlChangeDetector.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/LogReader.java similarity index 75% rename from debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/MySqlChangeDetector.java rename to debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/LogReader.java index 7a354b5b9..a436c26c3 100644 --- a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/MySqlChangeDetector.java +++ b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/LogReader.java @@ -3,14 +3,13 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.mysql.ingest; +package io.debezium.mysql.source; import java.io.IOException; import java.util.ArrayList; import java.util.EnumMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -35,24 +34,24 @@ import com.github.shyiko.mysql.binlog.network.AuthenticationException; import io.debezium.config.Configuration; -import io.debezium.mysql.MySqlConfiguration; -import io.debezium.relational.TableId; +import io.debezium.mysql.Module; import io.debezium.relational.Tables; /** * A Kafka Connect source task reads the MySQL binary log and generate the corresponding data change events. * - * @see MySqlConnector + * @see Connector * @author Randall Hauch */ -public class MySqlChangeDetector extends SourceTask { +public class LogReader extends SourceTask { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final EnumMap eventHandlers = new EnumMap<>(EventType.class); - private final Tables tables; - private final TableConverters tableConverters; + private final TopicSelector topicSelector; // These are all effectively constants between start(...) and stop(...) + private EnumMap eventHandlers = new EnumMap<>(EventType.class); + private Tables tables; + private TableConverters tableConverters; private BinaryLogClient client; private BlockingQueue events; private List batchEvents; @@ -62,50 +61,63 @@ public class MySqlChangeDetector extends SourceTask { // Used in the methods that process events ... private final SourceInfo sourceInfo = new SourceInfo(); - public MySqlChangeDetector() { + /** + * Create an instance of the log reader that uses the {@link TopicSelector#defaultSelector() default topic selector} of + * "{@code ..}" for data and "{@code }" for metadata. + */ + public LogReader() { this(null); } - public MySqlChangeDetector( TopicSelector topicSelector ) { - topicSelector = topicSelector != null ? topicSelector : TopicSelector.defaultSelector(); - tables = new Tables(); - tableConverters = new TableConverters(topicSelector, tables, this::signalTablesChanged); - eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata); - eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand); - eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert); - eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate); - eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete); + /** + * Create an instance of the log reader that uses the supplied {@link TopicSelector}. + * + * @param dataTopicSelector the selector for topics where data and metadata changes are to be written; if null the + * {@link TopicSelector#defaultSelector() default topic selector} will be used + */ + public LogReader(TopicSelector dataTopicSelector) { + this.topicSelector = dataTopicSelector != null ? dataTopicSelector : TopicSelector.defaultSelector(); } @Override public String version() { return Module.version(); } - - protected void signalTablesChanged( Set changedTables ) { - // TODO: do something - } @Override public void start(Map props) { // Read and verify the configuration ... final Configuration config = Configuration.from(props); - final String user = config.getString(MySqlConfiguration.USER); - final String password = config.getString(MySqlConfiguration.PASSWORD); - final String host = config.getString(MySqlConfiguration.HOSTNAME); - final int port = config.getInteger(MySqlConfiguration.PORT); - final Long serverId = config.getLong(MySqlConfiguration.SERVER_ID); - final String logicalId = config.getString(MySqlConfiguration.LOGICAL_ID.name(), "" + host + ":" + port); - final boolean keepAlive = config.getBoolean(MySqlConfiguration.KEEP_ALIVE); - final int maxQueueSize = config.getInteger(MySqlConfiguration.MAX_QUEUE_SIZE); - final long timeoutInMilliseconds = config.getLong(MySqlConfiguration.CONNECTION_TIMEOUT_MS); - maxBatchSize = config.getInteger(MySqlConfiguration.MAX_BATCH_SIZE); - pollIntervalMs = config.getLong(MySqlConfiguration.POLL_INTERVAL_MS); - + final String user = config.getString(ConnectorConfig.USER); + final String password = config.getString(ConnectorConfig.PASSWORD); + final String host = config.getString(ConnectorConfig.HOSTNAME); + final int port = config.getInteger(ConnectorConfig.PORT); + final Long serverId = config.getLong(ConnectorConfig.SERVER_ID); + final String serverName = config.getString(ConnectorConfig.SERVER_NAME.name(), host + ":" + port); + final boolean keepAlive = config.getBoolean(ConnectorConfig.KEEP_ALIVE); + final int maxQueueSize = config.getInteger(ConnectorConfig.MAX_QUEUE_SIZE); + final long timeoutInMilliseconds = config.getLong(ConnectorConfig.CONNECTION_TIMEOUT_MS); + maxBatchSize = config.getInteger(ConnectorConfig.MAX_BATCH_SIZE); + pollIntervalMs = config.getLong(ConnectorConfig.POLL_INTERVAL_MS); + if (maxQueueSize <= maxBatchSize) { + maxBatchSize = maxQueueSize / 2; + logger.error("The {} value must be larger than {}, so changing {} to {}", ConnectorConfig.MAX_QUEUE_SIZE, + ConnectorConfig.MAX_BATCH_SIZE, ConnectorConfig.MAX_QUEUE_SIZE, maxBatchSize); + } + // Create the queue ... events = new LinkedBlockingDeque<>(maxQueueSize); batchEvents = new ArrayList<>(maxBatchSize); + // Set up our handlers ... + tables = new Tables(); + tableConverters = new TableConverters(topicSelector, tables); + eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata); + eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand); + eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert); + eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate); + eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete); + // Set up the log reader ... client = new BinaryLogClient(host, port, user, password); client.setServerId(serverId); @@ -115,17 +127,23 @@ public void start(Map props) { client.registerLifecycleListener(traceLifecycleListener()); // Check if we've already processed some of the log for this database ... - sourceInfo.setDatabase(logicalId); + sourceInfo.setServerName(serverName); if (context != null) { - // TODO: Figure out how to load the table definitions from previous runs. Can it be read from each of the output - // topics? Does it need to be serialized locally? - // Get the offsets for our partition ... sourceInfo.setOffset(context.offsetStorageReader().offset(sourceInfo.partition())); // And set the client to start from that point ... client.setBinlogFilename(sourceInfo.binlogFilename()); client.setBinlogPosition(sourceInfo.binlogPosition()); // The event row number will be used when processing the first event ... + + // We have to make our Tables reflect the state of the database at the above source partition (e.g., the location + // in the MySQL log where we last stopped reading. Since the TableConverts writes out all DDL statements to the + // TopicSelector.getTopic(serverName) topic, we can consume that topic and apply each of the DDL statements + // to our Tables object. Each of those DDL messages is keyed by the database name, and contains a single string + // of DDL. However, we should consume no further than offset we recovered above. + + // TODO: implement this + } else { // initializes this position, though it will be reset when we see the first event (should be a rotate event) ... sourceInfo.setBinlogPosition(client.getBinlogPosition()); @@ -146,7 +164,7 @@ public void start(Map props) { "Unable to connect to the MySQL database at " + host + ":" + port + " with user '" + user + "': " + e.getMessage(), e); } } - + @Override public List poll() throws InterruptedException { while (events.drainTo(batchEvents, maxBatchSize - batchEvents.size()) == 0 || batchEvents.isEmpty()) { diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/SourceInfo.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/SourceInfo.java similarity index 88% rename from debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/SourceInfo.java rename to debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/SourceInfo.java index 1c3cc510a..56999d89c 100644 --- a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/SourceInfo.java +++ b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/SourceInfo.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.mysql.ingest; +package io.debezium.mysql.source; import java.util.Map; @@ -29,9 +29,9 @@ * *

  * {
- *         "file" = "mysql-bin.000003",
- *         "pos" = 105586,
- *         "row" = 0
+ *     "file" = "mysql-bin.000003",
+ *     "pos" = 105586,
+ *     "row" = 0
  * }
  * 
* @@ -40,7 +40,7 @@ @NotThreadSafe final class SourceInfo { - public static final String DATABASE_PARTITION_KEY = "db"; + public static final String SERVER_PARTITION_KEY = "server"; public static final String BINLOG_FILENAME_OFFSET_KEY = "file"; public static final String BINLOG_POSITION_OFFSET_KEY = "pos"; public static final String BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY = "row"; @@ -48,7 +48,7 @@ final class SourceInfo { private String binlogFilename; private long binlogPosition = 4; private int eventRowNumber = 0; - private String databaseId; + private String serverName; private Map sourcePartition; public SourceInfo() { @@ -59,15 +59,15 @@ public SourceInfo() { * * @param logicalId the logical identifier for the database; may not be null */ - public void setDatabase(String logicalId) { - this.databaseId = logicalId; - sourcePartition = Collect.hashMapOf(DATABASE_PARTITION_KEY, databaseId); + public void setServerName(String logicalId) { + this.serverName = logicalId; + sourcePartition = Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); } /** * Get the Kafka Connect detail about the source "partition", which describes the portion of the source that we are * consuming. Since we're reading the binary log for a single database, the source partition specifies the - * {@link #setDatabase database server}. + * {@link #setServerName(String) database server}. *

* The resulting map is mutable for efficiency reasons (this information rarely changes), but should not be mutated. * @@ -79,7 +79,7 @@ public void setDatabase(String logicalId) { /** * Get the Kafka Connect detail about the source "offset", which describes the position within the source where we last - * stopped reading. + * have last read. * * @return a copy of the current offset; never null */ @@ -91,9 +91,9 @@ public Map offset() { /** * Set the current row number within a given event, and then get the Kafka Connect detail about the source "offset", which - * describes the position within the source where we last stopped reading. + * describes the position within the source where we have last read. * - * @param eventRowNumber the row number within the last event that was successfully processed + * @param eventRowNumber the 0-based row number within the last event that was successfully processed * @return a copy of the current offset; never null */ public Map offset(int eventRowNumber) { @@ -174,9 +174,9 @@ public int eventRowNumber() { /** * Get the logical identifier of the database that is the source of the events. - * @return the database name; null if it has not been {@link #setDatabase(String) set} + * @return the database name; null if it has not been {@link #setServerName(String) set} */ - public String database() { - return databaseId; + public String serverName() { + return serverName; } } diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/TableConverters.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/TableConverters.java similarity index 82% rename from debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/TableConverters.java rename to debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/TableConverters.java index 4e68e294c..eb2e9503a 100644 --- a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/ingest/TableConverters.java +++ b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/TableConverters.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.mysql.ingest; +package io.debezium.mysql.source; import java.io.Serializable; import java.util.BitSet; @@ -46,39 +46,45 @@ final class TableConverters { private final MySqlDdlParser ddlParser; private final Tables tables; private final TableSchemaBuilder schemaBuilder = new TableSchemaBuilder(); - private final Consumer> tablesChangedHandler; private final Map tableSchemaByTableName = new HashMap<>(); private final Map convertersByTableId = new HashMap<>(); private final Map tableNumbersByTableName = new HashMap<>(); - public TableConverters( TopicSelector topicSelector, Tables tables, Consumer> tablesChangedHandler ) { + public TableConverters(TopicSelector topicSelector, Tables tables) { this.topicSelector = topicSelector; - this.tablesChangedHandler = tablesChangedHandler != null ? tablesChangedHandler : (ids)->{}; this.tables = tables != null ? tables : new Tables(); this.ddlParser = new MySqlDdlParser(false); // don't include views } - + public void updateTableCommand(Event event, SourceInfo source, Consumer recorder) { QueryEventData command = event.getData(); String ddlStatements = command.getSql(); try { this.ddlParser.parse(ddlStatements, tables); - } catch ( ParsingException e) { + } catch (ParsingException e) { logger.error("Error parsing DDL statement and updating tables", e); } finally { - // Figure out what changed ... - Set changes = tables.drainChanges(); - changes.forEach(tableId->{ - Table table = tables.forTable(tableId); - if ( table == null ) { // removed - tableSchemaByTableName.remove(tableId.table()); - } else { - TableSchema schema = schemaBuilder.create(table, false); - tableSchemaByTableName.put(tableId.table(), schema); - } - }); - tablesChangedHandler.accept(changes); // notify + // Write the DDL statements to a source record on the topic named for the server. This way we'll record + // all DDL statements for all databases hosted by that server ... + String databaseName = command.getDatabase(); + String topic = topicSelector.getTopic(source.serverName()); + Integer partition = null; + SourceRecord record = new SourceRecord(source.partition(), source.offset(), topic, partition, + Schema.STRING_SCHEMA, databaseName, Schema.STRING_SCHEMA, ddlStatements); + recorder.accept(record); } + + // Figure out what changed ... + Set changes = tables.drainChanges(); + changes.forEach(tableId -> { + Table table = tables.forTable(tableId); + if (table == null) { // removed + tableSchemaByTableName.remove(tableId.table()); + } else { + TableSchema schema = schemaBuilder.create(table, false); + tableSchemaByTableName.put(tableId.table(), schema); + } + }); } /** @@ -102,9 +108,10 @@ public void updateTableMetadata(Event event, SourceInfo source, Consumer for (int row = 0; row <= source.eventRowNumber(); ++row) { Serializable[] values = write.getRows().get(row); Schema keySchema = converter.keySchema(); - Object key = converter.createKey(values,includedColumns); + Object key = converter.createKey(values, includedColumns); Schema valueSchema = converter.valueSchema(); - Struct value = converter.inserted(values,includedColumns); + Struct value = converter.inserted(values, includedColumns); SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, keySchema, key, valueSchema, value); recorder.accept(record); @@ -197,9 +210,9 @@ public void handleUpdate(Event event, SourceInfo source, Consumer Serializable[] before = changes.getKey(); Serializable[] after = changes.getValue(); Schema keySchema = converter.keySchema(); - Object key = converter.createKey(after,includedColumns); + Object key = converter.createKey(after, includedColumns); Schema valueSchema = converter.valueSchema(); - Struct value = converter.updated(before,includedColumnsBefore, after,includedColumns); + Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns); SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, keySchema, key, valueSchema, value); recorder.accept(record); @@ -216,9 +229,9 @@ public void handleDelete(Event event, SourceInfo source, Consumer for (int row = 0; row <= source.eventRowNumber(); ++row) { Serializable[] values = deleted.getRows().get(row); Schema keySchema = converter.keySchema(); - Object key = converter.createKey(values,includedColumns); + Object key = converter.createKey(values, includedColumns); Schema valueSchema = converter.valueSchema(); - Struct value = converter.inserted(values,includedColumns); + Struct value = converter.inserted(values, includedColumns); SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, keySchema, key, valueSchema, value); recorder.accept(record); @@ -227,7 +240,7 @@ public void handleDelete(Event event, SourceInfo source, Consumer protected static interface Converter { String topic(); - + Integer partition(); Schema keySchema(); @@ -238,7 +251,7 @@ protected static interface Converter { Struct inserted(Serializable[] row, BitSet includedColumns); - Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before, BitSet includedColumnsBeforeUpdate ); + Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before, BitSet includedColumnsBeforeUpdate); Struct deleted(Serializable[] deleted, BitSet includedColumns); } diff --git a/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/TopicSelector.java b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/TopicSelector.java new file mode 100644 index 000000000..32d766c36 --- /dev/null +++ b/debezium-ingest-mysql/src/main/java/io/debezium/mysql/source/TopicSelector.java @@ -0,0 +1,78 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.mysql.source; + +import io.debezium.annotation.ThreadSafe; + +/** + * A function that determines the name of topics for data and metadata. + * + * @author Randall Hauch + */ +@ThreadSafe +public interface TopicSelector { + /** + * Get the default topic selector logic, which uses a '.' delimiter character when needed. + * + * @return the topic selector; never null + */ + static TopicSelector defaultSelector() { + return defaultSelector("."); + } + + /** + * Get the default topic selector logic, which uses the supplied delimiter character when needed. + * + * @param delimiter the string delineating the server, database, and table names; may not be null + * @return the topic selector; never null + */ + static TopicSelector defaultSelector(String delimiter) { + return new TopicSelector() { + /** + * Get the name of the topic for the given server, database, and table names. This method returns + * "{@code }". + * + * @param serverName the name of the database server; may not be null + * @return the topic name; never null + */ + @Override + public String getTopic(String serverName) { + return serverName; + } + /** + * Get the name of the topic for the given server name. This method returns + * "{@code ..}". + * + * @param serverName the name of the database server; may not be null + * @param databaseName the name of the database; may not be null + * @param tableName the name of the table; may not be null + * @return the topic name; never null + */ + @Override + public String getTopic(String serverName, String databaseName, String tableName) { + return String.join(delimiter, serverName, databaseName, tableName); + } + }; + } + + /** + * Get the name of the topic for the given server name. + * + * @param serverName the name of the database server; may not be null + * @param databaseName the name of the database; may not be null + * @param tableName the name of the table; may not be null + * @return the topic name; never null + */ + String getTopic(String serverName, String databaseName, String tableName); + + /** + * Get the name of the topic for the given server, database, and table names. + * + * @param serverName the name of the database server; may not be null + * @return the topic name; never null + */ + String getTopic(String serverName); +} diff --git a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ConnectionIT.java b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ConnectionIT.java index 72a72904b..c573f1556 100644 --- a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ConnectionIT.java +++ b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ConnectionIT.java @@ -11,21 +11,19 @@ import org.junit.Ignore; import org.junit.Test; -import io.debezium.jdbc.TestDatabase; - public class ConnectionIT { @Ignore @Test public void shouldConnectToDefaulDatabase() throws SQLException { - try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("mysql"));) { + try (MySQLConnection conn = MySQLConnection.forTestDatabase("mysql");) { conn.connect(); } } @Test public void shouldDoStuffWithDatabase() throws SQLException { - try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("readbinlog_test"));) { + try (MySQLConnection conn = MySQLConnection.forTestDatabase("readbinlog_test");) { conn.connect(); // Set up the table as one transaction and wait to see the events ... conn.execute("DROP TABLE IF EXISTS person", @@ -46,7 +44,7 @@ public void shouldDoStuffWithDatabase() throws SQLException { @Ignore @Test public void shouldConnectToEmptyDatabase() throws SQLException { - try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("emptydb"));) { + try (MySQLConnection conn = MySQLConnection.forTestDatabase("emptydb");) { conn.connect(); } } diff --git a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/MySQLConnection.java b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/MySQLConnection.java index b8cadebdc..1f6f11f09 100644 --- a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/MySQLConnection.java +++ b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/MySQLConnection.java @@ -6,14 +6,36 @@ package io.debezium.mysql; import io.debezium.config.Configuration; +import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; /** - * A utility for working with MySQL connections. + * A utility for integration test cases to connect the MySQL server running in the Docker container created by this module's + * build. + * * @author Randall Hauch */ public class MySQLConnection extends JdbcConnection { - + + /** + * Obtain a connection instance to the named test database. + * + * @param databaseName the name of the test database + * @return the MySQLConnection instance; never null + */ + public static MySQLConnection forTestDatabase(String databaseName) { + return new MySQLConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) + .withDatabase(databaseName) + .build()); + } + + protected static void addDefaults(Configuration.Builder builder) { + builder.withDefault(JdbcConfiguration.HOSTNAME, "localhost") + .withDefault(JdbcConfiguration.PORT, 3306) + .withDefault(JdbcConfiguration.USER, "mysql") + .withDefault(JdbcConfiguration.PASSWORD, "mysqlpw"); + } + protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory("jdbc:mysql://${hostname}:${port}/${dbname}"); /** @@ -22,7 +44,7 @@ public class MySQLConnection extends JdbcConnection { * @param config the configuration; may not be null */ public MySQLConnection(Configuration config) { - super(config, FACTORY); + super(config, FACTORY, null, MySQLConnection::addDefaults); } /** @@ -33,6 +55,6 @@ public MySQLConnection(Configuration config) { * @param initialOperations the initial operations that should be run on each new connection; may be null */ public MySQLConnection(Configuration config, Operations initialOperations) { - super(config, FACTORY, initialOperations); + super(config, FACTORY, initialOperations, MySQLConnection::addDefaults); } } diff --git a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/MetadataIT.java b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/source/MetadataIT.java similarity index 98% rename from debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/MetadataIT.java rename to debezium-ingest-mysql/src/test/java/io/debezium/mysql/source/MetadataIT.java index c4541f5b0..c9ce1ebf3 100644 --- a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/MetadataIT.java +++ b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/source/MetadataIT.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.mysql.ingest; +package io.debezium.mysql.source; import java.sql.SQLException; import java.sql.Types; @@ -12,7 +12,6 @@ import static org.fest.assertions.Assertions.assertThat; -import io.debezium.jdbc.TestDatabase; import io.debezium.mysql.MySQLConnection; import io.debezium.relational.Column; import io.debezium.relational.Table; @@ -27,7 +26,7 @@ public class MetadataIT { */ @Test public void shouldLoadMetadataViaJdbc() throws SQLException { - try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("readbinlog_test"));) { + try (MySQLConnection conn = MySQLConnection.forTestDatabase("readbinlog_test");) { conn.connect(); // Set up the table as one transaction and wait to see the events ... conn.execute("DROP TABLE IF EXISTS person", diff --git a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/ReadBinLogIT.java b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/source/ReadBinLogIT.java similarity index 99% rename from debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/ReadBinLogIT.java rename to debezium-ingest-mysql/src/test/java/io/debezium/mysql/source/ReadBinLogIT.java index 024ec8e07..69f41871f 100644 --- a/debezium-ingest-mysql/src/test/java/io/debezium/mysql/ingest/ReadBinLogIT.java +++ b/debezium-ingest-mysql/src/test/java/io/debezium/mysql/source/ReadBinLogIT.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.mysql.ingest; +package io.debezium.mysql.source; import static org.junit.Assert.fail; @@ -45,7 +45,6 @@ import static org.fest.assertions.Assertions.assertThat; import io.debezium.jdbc.JdbcConfiguration; -import io.debezium.jdbc.TestDatabase; import io.debezium.mysql.MySQLConnection; public class ReadBinLogIT { @@ -59,7 +58,6 @@ private static final class AnyValue implements Serializable { private static final Serializable ANY_OBJECT = new AnyValue(); - private JdbcConfiguration config; private EventQueue counters; private BinaryLogClient client; private MySQLConnection conn; @@ -69,12 +67,13 @@ private static final class AnyValue implements Serializable { public void beforeEach() throws TimeoutException, IOException, SQLException, InterruptedException { events.clear(); - config = TestDatabase.buildTestConfig().withDatabase("readbinlog_test").build(); - // Connect the normal SQL client ... - conn = new MySQLConnection(config); + conn = MySQLConnection.forTestDatabase("readbinlog_test"); conn.connect(); + // Get the configuration that we used ... + JdbcConfiguration config = conn.config(); + // Connect the bin log client ... counters = new EventQueue(DEFAULT_TIMEOUT, this::logConsumedEvent, this::logIgnoredEvent); client = new BinaryLogClient(config.getHostname(), config.getPort(), "replicator", "replpass"); diff --git a/debezium-ingest-postgres/src/test/java/io/debezium/ingest/postgresql/ConnectionIT.java b/debezium-ingest-postgres/src/test/java/io/debezium/postgresql/ConnectionIT.java similarity index 58% rename from debezium-ingest-postgres/src/test/java/io/debezium/ingest/postgresql/ConnectionIT.java rename to debezium-ingest-postgres/src/test/java/io/debezium/postgresql/ConnectionIT.java index 4cd80a814..5563b8d8f 100644 --- a/debezium-ingest-postgres/src/test/java/io/debezium/ingest/postgresql/ConnectionIT.java +++ b/debezium-ingest-postgres/src/test/java/io/debezium/postgresql/ConnectionIT.java @@ -3,27 +3,24 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.ingest.postgresql; +package io.debezium.postgresql; import java.sql.SQLException; import org.junit.Test; -import io.debezium.ingest.postgres.PostgresConnection; -import io.debezium.jdbc.TestDatabase; - public class ConnectionIT { - + @Test public void shouldConnectToDefaulDatabase() throws SQLException { - try (PostgresConnection conn = new PostgresConnection( TestDatabase.testConfig("postgres") );) { + try (PostgresConnection conn = PostgresConnection.forTestDatabase("postgres");) { conn.connect(); } } - + @Test public void shouldConnectToEmptyDatabase() throws SQLException { - try (PostgresConnection conn = new PostgresConnection( TestDatabase.testConfig("emptydb") );) { + try (PostgresConnection conn = PostgresConnection.forTestDatabase("emptydb");) { conn.connect(); } } diff --git a/debezium-ingest-postgres/src/main/java/io/debezium/ingest/postgres/PostgresConnection.java b/debezium-ingest-postgres/src/test/java/io/debezium/postgresql/PostgresConnection.java similarity index 52% rename from debezium-ingest-postgres/src/main/java/io/debezium/ingest/postgres/PostgresConnection.java rename to debezium-ingest-postgres/src/test/java/io/debezium/postgresql/PostgresConnection.java index 577bd6e86..74c9c4b11 100644 --- a/debezium-ingest-postgres/src/main/java/io/debezium/ingest/postgres/PostgresConnection.java +++ b/debezium-ingest-postgres/src/test/java/io/debezium/postgresql/PostgresConnection.java @@ -3,17 +3,39 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.ingest.postgres; +package io.debezium.postgresql; import io.debezium.config.Configuration; +import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; /** - * A utility for working with MySQL connections. + * A utility for integration test cases to connect the PostgreSQL server running in the Docker container created by this + * module's build. + * * @author Randall Hauch */ public class PostgresConnection extends JdbcConnection { - + + /** + * Obtain a connection instance to the named test database. + * + * @param databaseName the name of the test database + * @return the PostgresConnection instance; never null + */ + public static PostgresConnection forTestDatabase(String databaseName) { + return new PostgresConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) + .withDatabase(databaseName) + .build()); + } + + protected static void addDefaults(Configuration.Builder builder) { + builder.withDefault(JdbcConfiguration.HOSTNAME, "localhost") + .withDefault(JdbcConfiguration.PORT, 5432) + .withDefault(JdbcConfiguration.USER, "postgres") + .withDefault(JdbcConfiguration.PASSWORD, "postgres"); + } + protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory("jdbc:postgresql://${hostname}:${port}/${dbname}"); /**