DBZ-1 Added the initial stages of a MySQL source connector

The connector is in a basic working state, although it is not well tested yet and upon restart does not recover the schema state from the previous run.
This commit is contained in:
Randall Hauch 2016-01-28 08:33:29 -06:00
parent 71cfbb16f8
commit 6796fe32be
27 changed files with 966 additions and 339 deletions

View File

@ -1,5 +1,14 @@
## Change log
# Change log
All notable changes are documented in this file. Release numbers follow [Semantic Versioning](http://semver.org)
### Unreleased
## Unreleased
### Added
* MySQL connector for ingesting change events from MySQL databases ([DBZ-1](https://issues.jboss.org/projects/DBZ/issues/DBZ-1))
* Simple DDL parsing framework that can be extended and used by various connectors (as part of [DBZ-1](https://issues.jboss.org/projects/DBZ/issues/DBZ-1))
### Changed
### Fixed

View File

@ -36,7 +36,9 @@
/**
* An immutable representation of a Debezium configuration. A {@link Configuration} instance can be obtained
* {@link #from(Properties) from Properties} or loaded from a {@link #load(File) file}, {@link #load(InputStream) stream},
* {@link #load(Reader) reader}, {@link #load(URL) URL}, or from a {@link #load(String, ClassLoader) resource on the classpath}.
* {@link #load(Reader) reader}, {@link #load(URL) URL}, or {@link #load(String, ClassLoader) classpath resource}. They can
* also be built by first {@link #create() creating a builder} and then using that builder to populate and
* {@link Builder#build() return} the immutable Configuration instance.
* <p>
* A Configuration object is basically a decorator around a {@link Properties} object. It has methods to get and convert
* individual property values to numeric, boolean and String types, optionally using a default value if the given property value
@ -48,51 +50,6 @@
@Immutable
public interface Configuration {
public static Field field(String name, String description) {
return new Field(name, description, null);
}
public static Field field(String name, String description, String defaultValue) {
return new Field(name, description, defaultValue);
}
public static Field field(String name, String description, int defaultValue) {
return new Field(name, description, Integer.toString(defaultValue));
}
public static Field field(String name, String description, long defaultValue) {
return new Field(name, description, Long.toString(defaultValue));
}
public static Field field(String name, String description, boolean defaultValue) {
return new Field(name, description, Boolean.toString(defaultValue));
}
public static class Field {
private final String name;
private final String desc;
private final String defaultValue;
public Field(String name, String description, String defaultValue) {
this.name = name;
this.desc = description;
this.defaultValue = defaultValue;
assert this.name != null;
}
public String name() {
return name;
}
public String defaultValue() {
return defaultValue;
}
public String description() {
return desc;
}
}
/**
* The basic interface for configuration builders.
*
@ -164,6 +121,70 @@ default B with(String key, boolean value) {
return with(key, Boolean.toString(value));
}
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
B withDefault(String key, String value);
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(String key, int value) {
return withDefault(key, Integer.toString(value));
}
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(String key, float value) {
return withDefault(key, Float.toString(value));
}
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(String key, double value) {
return withDefault(key, Double.toString(value));
}
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(String key, long value) {
return withDefault(key, Long.toString(value));
}
/**
* If there is no field with the specified key, then associate the given value with the specified key.
*
* @param key the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(String key, boolean value) {
return withDefault(key, Boolean.toString(value));
}
/**
* Associate the given value with the key of the specified field.
*
@ -172,7 +193,7 @@ default B with(String key, boolean value) {
* @return this builder object so methods can be chained together; never null
*/
default B with(Field field, String value) {
return with(field.name(),value);
return with(field.name(), value);
}
/**
@ -183,7 +204,7 @@ default B with(Field field, String value) {
* @return this builder object so methods can be chained together; never null
*/
default B with(Field field, int value) {
return with(field.name(),value);
return with(field.name(), value);
}
/**
@ -194,7 +215,7 @@ default B with(Field field, int value) {
* @return this builder object so methods can be chained together; never null
*/
default B with(Field field, float value) {
return with(field.name(),value);
return with(field.name(), value);
}
/**
@ -205,7 +226,7 @@ default B with(Field field, float value) {
* @return this builder object so methods can be chained together; never null
*/
default B with(Field field, double value) {
return with(field.name(),value);
return with(field.name(), value);
}
/**
@ -216,7 +237,7 @@ default B with(Field field, double value) {
* @return this builder object so methods can be chained together; never null
*/
default B with(Field field, long value) {
return with(field.name(),value);
return with(field.name(), value);
}
/**
@ -227,9 +248,83 @@ default B with(Field field, long value) {
* @return this builder object so methods can be chained together; never null
*/
default B with(Field field, boolean value) {
return with(field.name(),value);
return with(field.name(), value);
}
/**
* If the field does not have a value, then associate the given value with the key of the specified field.
*
* @param field the predefined field for the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(Field field, String value) {
return withDefault(field.name(), value);
}
/**
* If the field does not have a value, then associate the given value with the key of the specified field.
*
* @param field the predefined field for the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(Field field, int value) {
return withDefault(field.name(), value);
}
/**
* If the field does not have a value, then associate the given value with the key of the specified field.
*
* @param field the predefined field for the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(Field field, float value) {
return withDefault(field.name(), value);
}
/**
* If the field does not have a value, then associate the given value with the key of the specified field.
*
* @param field the predefined field for the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(Field field, double value) {
return withDefault(field.name(), value);
}
/**
* If the field does not have a value, then associate the given value with the key of the specified field.
*
* @param field the predefined field for the key
* @param value the value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(Field field, long value) {
return withDefault(field.name(), value);
}
/**
* If the field does not have a value, then associate the given value with the key of the specified field.
*
* @param field the predefined field for the key
* @param value the default value
* @return this builder object so methods can be chained together; never null
*/
default B withDefault(Field field, boolean value) {
return withDefault(field.name(), value);
}
/**
* Apply the function to this builder.
*
* @param function the predefined field for the key
* @return this builder object so methods can be chained together; never null
*/
B apply(Consumer<B> function);
/**
* Build and return the immutable configuration.
*
@ -256,6 +351,20 @@ public Builder with(String key, String value) {
props.setProperty(key, value);
return this;
}
@Override
public Builder withDefault(String key, String value) {
if ( !props.containsKey(key)) {
props.setProperty(key, value);
}
return this;
}
@Override
public Builder apply(Consumer<Builder> function) {
function.accept(this);
return this;
}
@Override
public Configuration build() {
@ -281,7 +390,7 @@ public static Builder create() {
public static Builder copy(Configuration config) {
return new Builder(config.asProperties());
}
/**
* Create a Configuration object that is populated by system properties, per {@link #withSystemProperties(String)}.
*
@ -473,7 +582,15 @@ public static Configuration load(String path, ClassLoader classLoader, Consumer<
return from(props);
}
}
/**
* Obtain an editor for a copy of this configuration.
* @return a builder that is populated with this configuration's key-value pairs; never null
*/
default Builder edit() {
return copy(this);
}
/**
* Determine whether this configuration contains a key-value pair with the given key and the value is non-null
*
@ -536,6 +653,19 @@ default String getString(Field field) {
return getString(field.name(), field.defaultValue());
}
/**
* Get the string value associated with the given field, returning the field's default value if there is no such key-value
* pair in this configuration.
*
* @param field the field; may not be null
* @param defaultValue the default value
* @return the configuration's value for the field, or the field's {@link Field#defaultValue() default value} if there is no
* such key-value pair in the configuration
*/
default String getString(Field field, String defaultValue) {
return getString(field.name(), ()->field.defaultValue());
}
/**
* Get the string value(s) associated with the given key, where the supplied regular expression is used to parse the single
* string value into multiple values.
@ -693,9 +823,10 @@ default Boolean getBoolean(String key, BooleanSupplier defaultValueSupplier) {
* @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is
* no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in
* the configuration but the value could not be parsed as an integer value
* @throws NumberFormatException if there is no name-value pair and the field has no default value
*/
default int getInteger(Field field) {
return getInteger(field.name(), Integer.valueOf(field.defaultValue()));
return getInteger(field.name(), ()->Integer.valueOf(field.defaultValue())).intValue();
}
/**
@ -706,9 +837,52 @@ default int getInteger(Field field) {
* @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is
* no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in
* the configuration but the value could not be parsed as a long value
* @throws NumberFormatException if there is no name-value pair and the field has no default value
*/
default long getLong(Field field) {
return getLong(field.name(), Long.valueOf(field.defaultValue()));
return getLong(field.name(), ()->Long.valueOf(field.defaultValue())).longValue();
}
/**
* Get the boolean value associated with the given field when that field has a default value. If the configuration does
* not have a name-value pair with the same name as the field, then the field's default value.
*
* @param field the field
* @return the boolean value, or null if the key is null, there is no such key-value pair in the configuration and there is
* no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in
* the configuration but the value could not be parsed as a boolean value
* @throws NumberFormatException if there is no name-value pair and the field has no default value
*/
default boolean getBoolean(Field field) {
return getBoolean(field.name(), ()->Boolean.valueOf(field.defaultValue())).booleanValue();
}
/**
* Get the integer value associated with the given field, returning the field's default value if there is no such
* key-value pair.
*
* @param field the field
* @param defaultValue the default value
* @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is
* no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in
* the configuration but the value could not be parsed as an integer value
*/
default int getInteger(Field field, int defaultValue) {
return getInteger(field.name(), defaultValue);
}
/**
* Get the long value associated with the given field, returning the field's default value if there is no such
* key-value pair.
*
* @param field the field
* @param defaultValue the default value
* @return the integer value, or null if the key is null, there is no such key-value pair in the configuration and there is
* no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in
* the configuration but the value could not be parsed as a long value
*/
default long getLong(Field field, long defaultValue) {
return getLong(field.name(), defaultValue);
}
/**
@ -716,12 +890,13 @@ default long getLong(Field field) {
* key-value pair.
*
* @param field the field
* @param defaultValue the default value
* @return the boolean value, or null if the key is null, there is no such key-value pair in the configuration and there is
* no default value in the field or the default value could not be parsed as a long, or there is a key-value pair in
* the configuration but the value could not be parsed as a boolean value
*/
default boolean getBoolean(Field field) {
return getBoolean(field.name(), Boolean.valueOf(field.defaultValue()));
default boolean getBoolean(Field field, boolean defaultValue) {
return getBoolean(field.name(), defaultValue);
}
/**

View File

@ -0,0 +1,180 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.config;
import java.util.Objects;
import io.debezium.annotation.Immutable;
/**
* An immutable definition of a field that make appear within a {@link Configuration} instance.
*
* @author Randall Hauch
*/
@Immutable
public final class Field {
/**
* Create an immutable {@link Field} instance with the given property name and description.
*
* @param name the name of the field; may not be null
* @param description the description
* @return the field; never null
*/
public static Field create(String name, String description) {
return new Field(name, description, null);
}
/**
* Create an immutable {@link Field} instance with the given property name, description, and default value.
*
* @param name the name of the field; may not be null
* @param description the description
* @param defaultValue the default value for the field
* @return the field; never null
*/
public static Field create(String name, String description, String defaultValue) {
return new Field(name, description, defaultValue);
}
/**
* Create an immutable {@link Field} instance with the given property name, description, and default value.
*
* @param name the name of the field; may not be null
* @param description the description
* @param defaultValue the default value for the field
* @return the field; never null
*/
public static Field create(String name, String description, int defaultValue) {
return new Field(name, description, Integer.toString(defaultValue));
}
/**
* Create an immutable {@link Field} instance with the given property name, description, and default value.
*
* @param name the name of the field; may not be null
* @param description the description
* @param defaultValue the default value for the field
* @return the field; never null
*/
public static Field create(String name, String description, long defaultValue) {
return new Field(name, description, Long.toString(defaultValue));
}
/**
* Create an immutable {@link Field} instance with the given property name, description, and default value.
*
* @param name the name of the field; may not be null
* @param description the description
* @param defaultValue the default value for the field
* @return the field; never null
*/
public static Field create(String name, String description, boolean defaultValue) {
return new Field(name, description, Boolean.toString(defaultValue));
}
private final String name;
private final String desc;
private final String defaultValue;
protected Field(String name, String description, String defaultValue) {
Objects.requireNonNull(name, "The field name is required");
this.name = name;
this.desc = description;
this.defaultValue = defaultValue;
assert this.name != null;
}
/**
* Get the name of the field.
*
* @return the name; never null
*/
public String name() {
return name;
}
/**
* Get the default value of the field.
*
* @return the default value as a string; never null
*/
public String defaultValue() {
return defaultValue;
}
/**
* Get the description of the field.
*
* @return the description; never null
*/
public String description() {
return desc;
}
/**
* Create and return a new Field instance that has the same name and description as this instance, but with the given
* default value.
*
* @param defaultValue the new default value for the new field
* @return the new field; never null
*/
public Field withDefault(String defaultValue) {
return Field.create(name(), description(), defaultValue);
}
/**
* Create and return a new Field instance that has the same name and description as this instance, but with the given
* default value.
*
* @param defaultValue the new default value for the new field
* @return the new field; never null
*/
public Field withDefault(boolean defaultValue) {
return Field.create(name(), description(), defaultValue);
}
/**
* Create and return a new Field instance that has the same name and description as this instance, but with the given
* default value.
*
* @param defaultValue the new default value for the new field
* @return the new field; never null
*/
public Field withDefault(int defaultValue) {
return Field.create(name(), description(), defaultValue);
}
/**
* Create and return a new Field instance that has the same name and description as this instance, but with the given
* default value.
*
* @param defaultValue the new default value for the new field
* @return the new field; never null
*/
public Field withDefault(long defaultValue) {
return Field.create(name(), description(), defaultValue);
}
@Override
public int hashCode() {
return name.hashCode();
}
@Override
public boolean equals(Object obj) {
if ( obj == this ) return true;
if ( obj instanceof Field ) {
Field that = (Field)obj;
return this.name().equals(that.name());
}
return false;
}
@Override
public String toString() {
return name();
}
}

View File

@ -7,35 +7,53 @@
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.Collect;
/**
* A specialized configuration for the Debezium driver.
* A specialized configuration for the Debezium driver. This defines several known {@link io.debezium.config.Field
* fields} that are common to all JDBC configurations.
*
* @author Randall Hauch
*/
@Immutable
public interface JdbcConfiguration extends Configuration {
public static final Field DATABASE = Configuration.field("dbname",
"Name of the database");
public static final Field USER = Configuration.field("user",
"Name of the database user to be used when connecting to the database");
public static final Field PASSWORD = Configuration.field("password",
"Password to be used when connecting to the database");
public static final Field HOSTNAME = Configuration.field("hostname", "IP address of the database");
public static final Field PORT = Configuration.field("port", "Port of the database", 5432);
/**
* A field for the name of the database. This field has no default value.
*/
public static final Field DATABASE = Field.create("dbname",
"Name of the database");
/**
* A field for the user of the database. This field has no default value.
*/
public static final Field USER = Field.create("user",
"Name of the database user to be used when connecting to the database");
/**
* A field for the password of the database. This field has no default value.
*/
public static final Field PASSWORD = Field.create("password",
"Password to be used when connecting to the database");
/**
* A field for the hostname of the database server. This field has no default value.
*/
public static final Field HOSTNAME = Field.create("hostname", "IP address of the database");
/**
* A field for the port of the database server. There is no default value.
*/
public static final Field PORT = Field.create("port", "Port of the database");
/**
* The set of pre-defined fields for JDBC configurations.
* The set of names of the pre-defined JDBC configuration fields, including {@link #DATABASE}, {@link #USER},
* {@link #PASSWORD}, {@link #HOSTNAME}, and {@link #PORT}.
*/
public static Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT);
/**
* Obtain a {@link JdbcConfiguration} adapter for the given {@link Configuration}.
*
@ -136,6 +154,20 @@ public Builder with(String key, String value) {
return this;
}
@Override
public Builder withDefault(String key, String value) {
if (!props.containsKey(key)) {
props.setProperty(key, value);
}
return this;
}
@Override
public Builder apply(Consumer<Builder> function) {
function.accept(this);
return this;
}
@Override
public JdbcConfiguration build() {
return JdbcConfiguration.adapt(Configuration.from(props));
@ -163,6 +195,20 @@ public Builder with(String key, String value) {
return this;
}
@Override
public Builder withDefault(String key, String value) {
if (!props.containsKey(key)) {
props.setProperty(key, value);
}
return this;
}
@Override
public Builder apply(Consumer<Builder> function) {
function.accept(this);
return this;
}
@Override
public JdbcConfiguration build() {
return JdbcConfiguration.adapt(Configuration.from(props));

View File

@ -14,7 +14,9 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -25,6 +27,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
@ -85,18 +88,20 @@ public static interface Operations {
* </ul>
*
* @param urlPattern the URL pattern string; may not be null
* @param variables any custom or overridden configuration variables
* @return the connection factory
*/
protected static ConnectionFactory patternBasedFactory(String urlPattern) {
protected static ConnectionFactory patternBasedFactory(String urlPattern, Field... variables) {
return (config) -> {
LOGGER.trace("Config: {}", config.asProperties());
Properties props = config.asProperties();
String url = findAndReplace(urlPattern, props,
JdbcConfiguration.HOSTNAME,
JdbcConfiguration.PORT,
JdbcConfiguration.USER,
JdbcConfiguration.PASSWORD,
JdbcConfiguration.DATABASE);
Field[] varsWithDefaults = combineVariables(variables,
JdbcConfiguration.HOSTNAME,
JdbcConfiguration.PORT,
JdbcConfiguration.USER,
JdbcConfiguration.PASSWORD,
JdbcConfiguration.DATABASE);
String url = findAndReplace(urlPattern, props, varsWithDefaults);
LOGGER.trace("Props: {}", props);
LOGGER.trace("URL: {}", url);
Connection conn = DriverManager.getConnection(url, props);
@ -105,13 +110,29 @@ protected static ConnectionFactory patternBasedFactory(String urlPattern) {
};
}
private static String findAndReplace(String url, Properties props, Configuration.Field... variables) {
for (Configuration.Field field : variables ) {
private static Field[] combineVariables(Field[] overriddenVariables,
Field... defaultVariables) {
Map<String, Field> fields = new HashMap<>();
if (defaultVariables != null) {
for (Field variable : defaultVariables) {
fields.put(variable.name(), variable);
}
}
if (overriddenVariables != null) {
for (Field variable : overriddenVariables) {
fields.put(variable.name(), variable);
}
}
return fields.values().toArray(new Field[fields.size()]);
}
private static String findAndReplace(String url, Properties props, Field... variables) {
for (Field field : variables) {
String variable = field.name();
if (variable != null && url.contains("${" + variable + "}")) {
// Otherwise, we have to remove it from the properties ...
String value = props.getProperty(variable);
if ( value != null ) {
if (value != null) {
props.remove(variable);
// And replace the variable ...
url = url.replaceAll("\\$\\{" + variable + "\\}", value);
@ -145,12 +166,34 @@ public JdbcConnection(Configuration config, ConnectionFactory connectionFactory)
* @param initialOperations the initial operations that should be run on each new connection; may be null
*/
public JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Operations initialOperations) {
this.config = config;
this(config,connectionFactory,initialOperations,null);
}
/**
* Create a new instance with the given configuration and connection factory, and specify the operations that should be
* run against each newly-established connection.
*
* @param config the configuration; may not be null
* @param connectionFactory the connection factory; may not be null
* @param initialOperations the initial operations that should be run on each new connection; may be null
* @param adapter the function that can be called to update the configuration with defaults
*/
protected JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Operations initialOperations, Consumer<Configuration.Builder> adapter) {
this.config = adapter == null ? config : config.edit().apply(adapter).build();
this.factory = connectionFactory;
this.initialOps = initialOperations;
this.conn = null;
}
/**
* Obtain the configuration for this connection.
*
* @return the JDBC configuration; never null
*/
public JdbcConfiguration config() {
return JdbcConfiguration.adapt(config);
}
/**
* Ensure a connection to the database is established.
*
@ -214,23 +257,23 @@ public JdbcConnection query(String query, Consumer<ResultSet> resultConsumer) th
}
return this;
}
public void print(ResultSet resultSet ) {
public void print(ResultSet resultSet) {
// CHECKSTYLE:OFF
print(resultSet,System.out::println);
print(resultSet, System.out::println);
// CHECKSTYLE:ON
}
public void print(ResultSet resultSet, Consumer<String> lines ) {
public void print(ResultSet resultSet, Consumer<String> lines) {
try {
ResultSetMetaData rsmd = resultSet.getMetaData();
int columnCount = rsmd.getColumnCount();
int[] columnSizes = findMaxLength(resultSet);
lines.accept(delimiter(columnCount, columnSizes));
StringBuilder sb = new StringBuilder();
for ( int i=1; i<=columnCount; i++ ) {
for (int i = 1; i <= columnCount; i++) {
if (i > 1) sb.append(" | ");
sb.append(Strings.setLength(rsmd.getColumnLabel(i),columnSizes[i],' '));
sb.append(Strings.setLength(rsmd.getColumnLabel(i), columnSizes[i], ' '));
}
lines.accept(sb.toString());
sb.setLength(0);
@ -239,7 +282,7 @@ public void print(ResultSet resultSet, Consumer<String> lines ) {
sb.setLength(0);
for (int i = 1; i <= columnCount; i++) {
if (i > 1) sb.append(" | ");
sb.append(Strings.setLength(resultSet.getString(i),columnSizes[i],' '));
sb.append(Strings.setLength(resultSet.getString(i), columnSizes[i], ' '));
}
lines.accept(sb.toString());
sb.setLength(0);
@ -249,27 +292,27 @@ public void print(ResultSet resultSet, Consumer<String> lines ) {
throw new RuntimeException(e);
}
}
private String delimiter( int columnCount, int[] columnSizes ) {
private String delimiter(int columnCount, int[] columnSizes) {
StringBuilder sb = new StringBuilder();
for ( int i=1; i<=columnCount; i++ ) {
for (int i = 1; i <= columnCount; i++) {
if (i > 1) sb.append("---");
sb.append(Strings.createString('-',columnSizes[i]));
sb.append(Strings.createString('-', columnSizes[i]));
}
return sb.toString();
}
private int[] findMaxLength( ResultSet resultSet ) throws SQLException {
private int[] findMaxLength(ResultSet resultSet) throws SQLException {
ResultSetMetaData rsmd = resultSet.getMetaData();
int columnCount = rsmd.getColumnCount();
int[] columnSizes = new int[columnCount+1];
for ( int i=1; i<=columnCount; i++ ) {
int[] columnSizes = new int[columnCount + 1];
for (int i = 1; i <= columnCount; i++) {
columnSizes[i] = Math.max(columnSizes[i], rsmd.getColumnLabel(i).length());
}
while (resultSet.next()) {
for (int i = 1; i <= columnCount; i++) {
String value = resultSet.getString(i);
if ( value != null ) columnSizes[i] = Math.max(columnSizes[i], value.length());
if (value != null) columnSizes[i] = Math.max(columnSizes[i], value.length());
}
}
resultSet.beforeFirst();
@ -299,7 +342,7 @@ public synchronized void close() throws SQLException {
}
}
}
/**
* Create definitions for each tables in the database, given the catalog name, schema pattern, table filter, and
* column filter.
@ -375,7 +418,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
*/
public static void columnsFor(ResultSet resultSet, TableEditor editor) throws SQLException {
List<Column> columns = new ArrayList<>();
columnsFor(resultSet,columns::add);
columnsFor(resultSet, columns::add);
editor.setColumns(columns);
}
@ -407,6 +450,4 @@ private static boolean isNullable(int jdbcNullable) {
return jdbcNullable == ResultSetMetaData.columnNullable || jdbcNullable == ResultSetMetaData.columnNullableUnknown;
}
}

View File

@ -61,6 +61,23 @@ public List<Column> columns() {
public Column columnWithName(String name) {
return columnsByLowercaseName.get(name.toLowerCase());
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public boolean equals(Object obj) {
if ( obj== this) return true;
if ( obj instanceof Table ) {
Table that = (Table)obj;
return this.id().equals(that.id())
&& this.columns().equals(that.columns())
&& this.primaryKeyColumnNames().equals(that.primaryKeyColumnNames());
}
return false;
}
@Override
public String toString() {
@ -68,7 +85,7 @@ public String toString() {
toString(sb, "");
return sb.toString();
}
protected void toString(StringBuilder sb, String prefix) {
if (prefix == null) prefix = "";
sb.append(prefix).append("columns: {").append(System.lineSeparator());

View File

@ -6,11 +6,11 @@
package io.debezium.relational;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.kafka.connect.data.Schema;
@ -65,7 +65,7 @@ public static interface ColumnFilter {
}
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final Map<TableId, TableImpl> tablesByTableId = new HashMap<>();
private final Map<TableId, TableImpl> tablesByTableId = new ConcurrentHashMap<>();
private final Set<TableId> changes = new HashSet<>();
/**
@ -298,7 +298,22 @@ public TableEditor editOrCreateTable(TableId tableId) {
public TableEditor editOrCreateTable(String catalogName, String schemaName, String tableName) {
return editOrCreateTable(new TableId(catalogName, schemaName, tableName));
}
@Override
public int hashCode() {
return tablesByTableId.hashCode();
}
@Override
public boolean equals(Object obj) {
if ( obj == this ) return true;
if ( obj instanceof Tables ) {
Tables that = (Tables)obj;
return this.tablesByTableId.equals(that.tablesByTableId);
}
return false;
}
@Override
public String toString() {
return lock.read(() -> {

View File

@ -575,7 +575,7 @@ protected void parseAlterTable(Marker start) {
// Update the table ...
Column newColumnDefn = column.create();
table.setColumns(newColumnDefn);
table.addColumn(newColumnDefn);
if (isPrimaryKey.get()) {
table.setPrimaryKeyNames(newColumnDefn.name());
}

View File

@ -337,6 +337,7 @@
* {@link Tokenizer}s with exactly this behavior can actually be created using the {@link #basicTokenizer(boolean)} method. So
* while this very basic implementation is not meant to be used in all situations, it may be useful in some situations.
* </p>
*
* @author Randall Hauch
* @author Horia Chiorean
* @author Daniel Kelleher

View File

@ -140,6 +140,37 @@ public static <K, V> Map<K, V> hashMapOf(K key1, V value1, K key2, V value2, K k
return map;
}
public static <K, V> Map<K, V> linkMapOf(K key, V value) {
Map<K, V> map = new LinkedHashMap<>();
map.put(key, value);
return map;
}
public static <K, V> Map<K, V> linkMapOf(K key1, V value1, K key2, V value2) {
Map<K, V> map = new LinkedHashMap<>();
map.put(key1, value1);
map.put(key2, value2);
return map;
}
public static <K, V> Map<K, V> linkMapOf(K key1, V value1, K key2, V value2, K key3, V value3) {
Map<K, V> map = new LinkedHashMap<>();
map.put(key1, value1);
map.put(key2, value2);
map.put(key3, value3);
return map;
}
public static <K, V> Map<K, V> linkMapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) {
Map<K, V> map = new LinkedHashMap<>();
map.put(key1, value1);
map.put(key2, value2);
map.put(key3, value3);
map.put(key4, value4);
return map;
}
/**
* Set the value at the given position in the list, expanding the list as required to accommodate the new position.
* <p>

View File

@ -1,19 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.jdbc;
import io.debezium.config.Configuration;
public class TestDatabase {
public static JdbcConfiguration testConfig( String databaseName ) {
return buildTestConfig().withDatabase(databaseName).build();
}
public static JdbcConfiguration.Builder buildTestConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."));
}
}

View File

@ -1,15 +1,30 @@
## Ingesting MySQL
# Ingesting MySQL change events
This module defines the connector that ingests change events from MySQL databases.
## Using the MySQL connector with Kafka Connect
The MySQL connector is designed to work with [Kafka Connect](http://kafka.apache.org/documentation.html#connect) and to be deployed to a Kafka Connect runtime service. The deployed connector will monitor one or more databases and write all change events to Kafka topics, which can be independently consumed by one or more clients. Kafka Connect can be distributed to provide fault tolerance to ensure the connectors are running and continually keeping up with changes in the database.
Kafka Connect can also be run standalone as a single process, although doing so is not tolerant of failures.
## Embedding the MySQL connector
The MySQL connector can also be used as a library without Kafka or Kafka Connect, enabling applications and services to directly connect to a MySQL database and obtain the ordered change events. This approach requires the application to record the progress of the connector so that upon restart the connect can continue where it left off. Therefore, this may be a useful approach for less critical use cases. For production use cases, we highly recommend using this connector with Kafka and Kafka Connect.
## Unit and integration tests
## Testing
This module contains both unit tests and integration tests.
A *unit test* is a JUnit test class named `*Test.java` or `Test*.java` that never requires or uses external services, though it can use the file system and can run any components within the same JVM process. They should run very quickly, be independent of each other, and clean up after itself.
An *integration test* is a JUnit test class named `*IT.java` or `IT*.java` that uses one or more MySQL databases running in a custom Docker container automatically started before the integration tests are run and automatically stopped and removed after all of the integration tests complete (regardless of whether they suceed or fail). All databases used in the integration tests are defined and populated using `*.sql` files and `*.sh` scripts in the `src/test/docker` directory, which are copied into the Docker image and run (in lexicographical order) by MySQL upon startup. Multiple test methods within a single integration test class can reuse the same database, but generally each integration test class should use its own dedicated database(s).
An *integration test* is a JUnit test class named `*IT.java` or `IT*.java` that uses a MySQL database server running in a custom Docker container. The build will automatically start the MySQL container before the integration tests are run and automatically stop and remove it after all of the integration tests complete (regardless of whether they suceed or fail). All databases used in the integration tests are defined and populated using `*.sql` files and `*.sh` scripts in the `src/test/docker/init` directory, which are copied into the Docker image and run in lexicographical order by MySQL upon startup. Multiple test methods within a single integration test class can reuse the same database, but generally each integration test class should use its own dedicated database(s).
Running `mvn install` will compile all code and run the unit tests. If there are any problems, such as failing unit tests, the build will stop immediately. Otherwise, the build will create the module's artifacts, create the Docker image with MySQL, start the Docker container, run the integration tests, and stop the container even if there are integration test failures. If there are no problems, the build will end by installing the artifacts into the local Maven repository.
Running `mvn install` will compile all code and run the unit tests. If there are any compile problems or any of the unit tests fail, the build will stop immediately. Otherwise, the command will continue to create the module's artifacts, create the Docker image with MySQL and custom scripts, start the Docker container, run the integration tests, stop the container (even if there are integration test failures), and run checkstyle on the code. If there are still no problems, the build will then install the module's artifacts into the local Maven repository.
You should always default to using `mvn install`, especially prior to committing changes to Git. However, there are a few situations where you may want to run a different Maven command.
@ -23,26 +38,38 @@ Of course, wildcards also work:
$ mvn -Dit.test=Connect*IT install
These commands will automatically manage the MySQL Docker container.
### Debugging tests
Normally, the MySQL Docker container is stopped and removed after the integration tests are run. One way to debug tests is to configure the build to wait for a remote debugging client, but then you also have to set up your IDE to connect. It's often far easier to debug a single test directly from within your IDE. To do that, you want to start the MySQL Docker container and keep it running:
If you want to debug integration tests by stepping through them in your IDE, using the `mvn install` command will be problematic since it will not wait for your IDE's breakpoints. There are ways of doing this, but it is typically far easier to simply start the Docker container and leave it running so that it is available when you run the integration test(s). To create and start the Docker container, simply run:
$ mvn docker:start
Then use your IDE to run one or more unit tests, optionally debugging them as needed. Just be sure that the unit tests clean up their database before (and after) each test.
Again, the container and database server will be initialized as usual but will continue to run. Now you can use your IDE to run/debug one or more integration tests. Just be sure that the integration tests clean up their database before (and after) each test, and that you run the tests with VM arguments that define the required system properties, including:
To stop the container, simply use Docker to stop and remove the MySQL Docker container named `database`:
* `database.dbname` - the name of the database that your integration test will use; there is no default
* `database.hostname` - the IP address or name of the host where the Docker container is running; defaults to `localhost` which is likely for Linux, but on OS X and Windows Docker it will have to be set to the IP address of the VM that runs Docker (which you can find by looking at the `DOCKER_HOST` environment variable).
* `database.port` - the port on which MySQL is listening; defaults to `3306` and is what this module's Docker container uses
* `database.user` - the name of the database user; defaults to `mysql` and is correct unless your database script uses something different
* `database.password` - the password of the database user; defaults to `mysqlpw` and is correct unless your database script uses something different
For example, you can define these properties by passing these arguments to the VM:
-Ddatabase.dbname=<DATABASE_NAME> -Ddatabase.hostname=<DOCKER_HOST> -Ddatabase.port=3306 -Ddatabase.user=mysqluser -Ddatabase.password=mysqlpw
When you are finished running the integration tests from your IDE, you have to stop and remove the Docker container (conveniently named "database") before you can run the next build:
$ docker stop database
$ docker rm database
### Analyzing the database
Sometimes you may want to inspect the state of the database(s) after one or more integration tests are run. The `mvn install` command runs the tests but shuts down and removes the container after the tests complete. To keep the container running after the tests complete, use this Maven command:
Sometimes you may want to inspect the state of the database(s) after one or more integration tests are run. The `mvn install` command runs the tests but shuts down and removes the container after the integration tests complete. To keep the container running after the integration tests complete, use this Maven command:
$ mvn integration-test
This instructs Maven to run the normal Maven lifecycle through `integration-test`, and to stop before the `post-integration-test` phase when the Docker container is normally shut down and removed. Be aware that you will need to manually stop and remove the container before running the build again, and to make this more convenient we give the MySQL container the alias `database`:
This instructs Maven to run the normal Maven lifecycle through `integration-test`, and to stop before the `post-integration-test` phase when the Docker container is normally shut down and removed. Be aware that you will need to manually stop and remove the container (conveniently named "database") before running the build again:
$ docker stop database
$ docker rm database

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
package io.debezium.mysql;
import java.util.Properties;

View File

@ -1,39 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql;
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Field;
/**
* The configuration properties.
*/
public class MySqlConfiguration {
public static final Field USER = Configuration.field("database.user",
"Name of the database user to be used when connecting to the database");
public static final Field PASSWORD = Configuration.field("database.password",
"Password to be used when connecting to the database");
public static final Field HOSTNAME = Configuration.field("database.hostname", "IP address of the database");
public static final Field PORT = Configuration.field("database.port", "Port of the database", 5432);
public static final Field SERVER_ID = Configuration.field("connect.id",
"ID of this database client, which must be unique across all database processes in the cluster.");
public static final Field CONNECTION_TIMEOUT_MS = Configuration.field("connect.timeout.ms",
"Maximum time in milliseconds to wait after trying to connect to the database before timing out.",
30 * 1000);
public static final Field KEEP_ALIVE = Configuration.field("connect.keep.alive",
"Whether a separate thread should be used to ensure the connection is kept alive.",
true);
public static final Field MAX_QUEUE_SIZE = Configuration.field("max.queue.size",
"Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Should be larger than the maximum batch size.",
2048);
public static final Field MAX_BATCH_SIZE = Configuration.field("max.batch.size", "Maximum size of each batch of source records.",
1024);
public static final Field POLL_INTERVAL_MS = Configuration.field("poll.interval.ms",
"Frequency in milliseconds to poll for new change events", 1 * 1000);
public static final Field LOGICAL_ID = Configuration.field("database.logical.id",
"Logical unique identifier for this database. Defaults to host:port");
}

View File

@ -1,43 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
/**
* A function that determines the name of a topic given the table name and database name.
*
* @author Randall Hauch
*/
@FunctionalInterface
public interface TopicSelector {
/**
* Get the default topic selector logic, which simply concatenates the database name and topic name using a '.' delimiter
* character.
*
* @return the topic selector; never null
*/
static TopicSelector defaultSelector() {
return defaultSelector(".");
}
/**
* Get the default topic selector logic, which simply concatenates the database name and topic name using the supplied
* delimiter.
*
* @param delimiter the string delineating the database name and table name; may not be null
* @return the topic selector; never null
*/
static TopicSelector defaultSelector(String delimiter) {
return (databaseName, tableName) -> databaseName + delimiter + tableName;
}
/**
* Get the name of the topic given the database and table names.
* @param databaseName the name of the database; may not be null
* @param tableName the name of the table; may not be null
* @return the topic name; never null
*/
String getTopic(String databaseName, String tableName);
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
package io.debezium.mysql.source;
import java.util.List;
import java.util.Map;
@ -11,15 +11,17 @@
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import io.debezium.mysql.Module;
/**
* A Kafka Connect source connector that creates tasks that read the MySQL binary log and generate the corresponding
* data change events.
*
* @author Randall Hauch
*/
public class MySqlConnector extends SourceConnector {
public class Connector extends SourceConnector {
public MySqlConnector() {
public Connector() {
}
@Override
@ -33,7 +35,7 @@ public void start(Map<String, String> props) {
@Override
public Class<? extends Task> taskClass() {
return null;
return LogReader.class;
}
@Override

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.source;
import io.debezium.config.Field;
/**
* The configuration properties.
*/
public class ConnectorConfig {
public static final Field USER = Field.create("database.user",
"Name of the database user to be used when connecting to the database.");
public static final Field PASSWORD = Field.create("database.password",
"Password to be used when connecting to the database.");
public static final Field HOSTNAME = Field.create("database.hostname", "IP address of the MySQL database server.");
public static final Field PORT = Field.create("database.port", "Port of the MySQL database server.", 3306);
public static final Field SERVER_ID = Field.create("database.server.id",
"A numeric ID of this database client, which must be unique across all currently-running database processes in the cluster. This is required because this connector essentially joins the MySQL database cluster as another server (with this unique ID) so it can read the binlog.");
public static final Field SERVER_NAME = Field.create("database.server.name",
"A unique name that identifies the database server that this connector monitors. Each database server should be monitored by at most one Debezium connector, since this server name delineates all persisted data eminating from this server. Defaults to 'host:port'");
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms",
"Maximum time in milliseconds to wait after trying to connect to the database before timing out.",
30 * 1000);
public static final Field KEEP_ALIVE = Field.create("connect.keep.alive",
"Whether a separate thread should be used to ensure the connection is kept alive.",
true);
public static final Field MAX_QUEUE_SIZE = Field.create("max.queue.size",
"Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 2048, and should always be larger than the maximum batch size.",
2048);
public static final Field MAX_BATCH_SIZE = Field.create("max.batch.size", "Maximum size of each batch of source records. Defaults to 1024.",
1024);
public static final Field POLL_INTERVAL_MS = Field.create("poll.interval.ms",
"Frequency in milliseconds to poll for new change events.", 1 * 1000);
}

View File

@ -3,14 +3,13 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
package io.debezium.mysql.source;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@ -35,24 +34,24 @@
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import io.debezium.config.Configuration;
import io.debezium.mysql.MySqlConfiguration;
import io.debezium.relational.TableId;
import io.debezium.mysql.Module;
import io.debezium.relational.Tables;
/**
* A Kafka Connect source task reads the MySQL binary log and generate the corresponding data change events.
*
* @see MySqlConnector
* @see Connector
* @author Randall Hauch
*/
public class MySqlChangeDetector extends SourceTask {
public class LogReader extends SourceTask {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final EnumMap<EventType, EventHandler> eventHandlers = new EnumMap<>(EventType.class);
private final Tables tables;
private final TableConverters tableConverters;
private final TopicSelector topicSelector;
// These are all effectively constants between start(...) and stop(...)
private EnumMap<EventType, EventHandler> eventHandlers = new EnumMap<>(EventType.class);
private Tables tables;
private TableConverters tableConverters;
private BinaryLogClient client;
private BlockingQueue<Event> events;
private List<Event> batchEvents;
@ -62,50 +61,63 @@ public class MySqlChangeDetector extends SourceTask {
// Used in the methods that process events ...
private final SourceInfo sourceInfo = new SourceInfo();
public MySqlChangeDetector() {
/**
* Create an instance of the log reader that uses the {@link TopicSelector#defaultSelector() default topic selector} of
* "{@code <serverName>.<databaseName>.<tableName>}" for data and "{@code <serverName>}" for metadata.
*/
public LogReader() {
this(null);
}
public MySqlChangeDetector( TopicSelector topicSelector ) {
topicSelector = topicSelector != null ? topicSelector : TopicSelector.defaultSelector();
tables = new Tables();
tableConverters = new TableConverters(topicSelector, tables, this::signalTablesChanged);
eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate);
eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete);
/**
* Create an instance of the log reader that uses the supplied {@link TopicSelector}.
*
* @param dataTopicSelector the selector for topics where data and metadata changes are to be written; if null the
* {@link TopicSelector#defaultSelector() default topic selector} will be used
*/
public LogReader(TopicSelector dataTopicSelector) {
this.topicSelector = dataTopicSelector != null ? dataTopicSelector : TopicSelector.defaultSelector();
}
@Override
public String version() {
return Module.version();
}
protected void signalTablesChanged( Set<TableId> changedTables ) {
// TODO: do something
}
@Override
public void start(Map<String, String> props) {
// Read and verify the configuration ...
final Configuration config = Configuration.from(props);
final String user = config.getString(MySqlConfiguration.USER);
final String password = config.getString(MySqlConfiguration.PASSWORD);
final String host = config.getString(MySqlConfiguration.HOSTNAME);
final int port = config.getInteger(MySqlConfiguration.PORT);
final Long serverId = config.getLong(MySqlConfiguration.SERVER_ID);
final String logicalId = config.getString(MySqlConfiguration.LOGICAL_ID.name(), "" + host + ":" + port);
final boolean keepAlive = config.getBoolean(MySqlConfiguration.KEEP_ALIVE);
final int maxQueueSize = config.getInteger(MySqlConfiguration.MAX_QUEUE_SIZE);
final long timeoutInMilliseconds = config.getLong(MySqlConfiguration.CONNECTION_TIMEOUT_MS);
maxBatchSize = config.getInteger(MySqlConfiguration.MAX_BATCH_SIZE);
pollIntervalMs = config.getLong(MySqlConfiguration.POLL_INTERVAL_MS);
final String user = config.getString(ConnectorConfig.USER);
final String password = config.getString(ConnectorConfig.PASSWORD);
final String host = config.getString(ConnectorConfig.HOSTNAME);
final int port = config.getInteger(ConnectorConfig.PORT);
final Long serverId = config.getLong(ConnectorConfig.SERVER_ID);
final String serverName = config.getString(ConnectorConfig.SERVER_NAME.name(), host + ":" + port);
final boolean keepAlive = config.getBoolean(ConnectorConfig.KEEP_ALIVE);
final int maxQueueSize = config.getInteger(ConnectorConfig.MAX_QUEUE_SIZE);
final long timeoutInMilliseconds = config.getLong(ConnectorConfig.CONNECTION_TIMEOUT_MS);
maxBatchSize = config.getInteger(ConnectorConfig.MAX_BATCH_SIZE);
pollIntervalMs = config.getLong(ConnectorConfig.POLL_INTERVAL_MS);
if (maxQueueSize <= maxBatchSize) {
maxBatchSize = maxQueueSize / 2;
logger.error("The {} value must be larger than {}, so changing {} to {}", ConnectorConfig.MAX_QUEUE_SIZE,
ConnectorConfig.MAX_BATCH_SIZE, ConnectorConfig.MAX_QUEUE_SIZE, maxBatchSize);
}
// Create the queue ...
events = new LinkedBlockingDeque<>(maxQueueSize);
batchEvents = new ArrayList<>(maxBatchSize);
// Set up our handlers ...
tables = new Tables();
tableConverters = new TableConverters(topicSelector, tables);
eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate);
eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete);
// Set up the log reader ...
client = new BinaryLogClient(host, port, user, password);
client.setServerId(serverId);
@ -115,17 +127,23 @@ public void start(Map<String, String> props) {
client.registerLifecycleListener(traceLifecycleListener());
// Check if we've already processed some of the log for this database ...
sourceInfo.setDatabase(logicalId);
sourceInfo.setServerName(serverName);
if (context != null) {
// TODO: Figure out how to load the table definitions from previous runs. Can it be read from each of the output
// topics? Does it need to be serialized locally?
// Get the offsets for our partition ...
sourceInfo.setOffset(context.offsetStorageReader().offset(sourceInfo.partition()));
// And set the client to start from that point ...
client.setBinlogFilename(sourceInfo.binlogFilename());
client.setBinlogPosition(sourceInfo.binlogPosition());
// The event row number will be used when processing the first event ...
// We have to make our Tables reflect the state of the database at the above source partition (e.g., the location
// in the MySQL log where we last stopped reading. Since the TableConverts writes out all DDL statements to the
// TopicSelector.getTopic(serverName) topic, we can consume that topic and apply each of the DDL statements
// to our Tables object. Each of those DDL messages is keyed by the database name, and contains a single string
// of DDL. However, we should consume no further than offset we recovered above.
// TODO: implement this
} else {
// initializes this position, though it will be reset when we see the first event (should be a rotate event) ...
sourceInfo.setBinlogPosition(client.getBinlogPosition());
@ -146,7 +164,7 @@ public void start(Map<String, String> props) {
"Unable to connect to the MySQL database at " + host + ":" + port + " with user '" + user + "': " + e.getMessage(), e);
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
while (events.drainTo(batchEvents, maxBatchSize - batchEvents.size()) == 0 || batchEvents.isEmpty()) {

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
package io.debezium.mysql.source;
import java.util.Map;
@ -29,9 +29,9 @@
*
* <pre>
* {
* "file" = "mysql-bin.000003",
* "pos" = 105586,
* "row" = 0
* "file" = "mysql-bin.000003",
* "pos" = 105586,
* "row" = 0
* }
* </pre>
*
@ -40,7 +40,7 @@
@NotThreadSafe
final class SourceInfo {
public static final String DATABASE_PARTITION_KEY = "db";
public static final String SERVER_PARTITION_KEY = "server";
public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
public static final String BINLOG_EVENT_ROW_NUMBER_OFFSET_KEY = "row";
@ -48,7 +48,7 @@ final class SourceInfo {
private String binlogFilename;
private long binlogPosition = 4;
private int eventRowNumber = 0;
private String databaseId;
private String serverName;
private Map<String, ?> sourcePartition;
public SourceInfo() {
@ -59,15 +59,15 @@ public SourceInfo() {
*
* @param logicalId the logical identifier for the database; may not be null
*/
public void setDatabase(String logicalId) {
this.databaseId = logicalId;
sourcePartition = Collect.hashMapOf(DATABASE_PARTITION_KEY, databaseId);
public void setServerName(String logicalId) {
this.serverName = logicalId;
sourcePartition = Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
}
/**
* Get the Kafka Connect detail about the source "partition", which describes the portion of the source that we are
* consuming. Since we're reading the binary log for a single database, the source partition specifies the
* {@link #setDatabase database server}.
* {@link #setServerName(String) database server}.
* <p>
* The resulting map is mutable for efficiency reasons (this information rarely changes), but should not be mutated.
*
@ -79,7 +79,7 @@ public void setDatabase(String logicalId) {
/**
* Get the Kafka Connect detail about the source "offset", which describes the position within the source where we last
* stopped reading.
* have last read.
*
* @return a copy of the current offset; never null
*/
@ -91,9 +91,9 @@ public Map<String, Object> offset() {
/**
* Set the current row number within a given event, and then get the Kafka Connect detail about the source "offset", which
* describes the position within the source where we last stopped reading.
* describes the position within the source where we have last read.
*
* @param eventRowNumber the row number within the last event that was successfully processed
* @param eventRowNumber the 0-based row number within the last event that was successfully processed
* @return a copy of the current offset; never null
*/
public Map<String, Object> offset(int eventRowNumber) {
@ -174,9 +174,9 @@ public int eventRowNumber() {
/**
* Get the logical identifier of the database that is the source of the events.
* @return the database name; null if it has not been {@link #setDatabase(String) set}
* @return the database name; null if it has not been {@link #setServerName(String) set}
*/
public String database() {
return databaseId;
public String serverName() {
return serverName;
}
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
package io.debezium.mysql.source;
import java.io.Serializable;
import java.util.BitSet;
@ -46,39 +46,45 @@ final class TableConverters {
private final MySqlDdlParser ddlParser;
private final Tables tables;
private final TableSchemaBuilder schemaBuilder = new TableSchemaBuilder();
private final Consumer<Set<TableId>> tablesChangedHandler;
private final Map<String, TableSchema> tableSchemaByTableName = new HashMap<>();
private final Map<Long, Converter> convertersByTableId = new HashMap<>();
private final Map<String, Long> tableNumbersByTableName = new HashMap<>();
public TableConverters( TopicSelector topicSelector, Tables tables, Consumer<Set<TableId>> tablesChangedHandler ) {
public TableConverters(TopicSelector topicSelector, Tables tables) {
this.topicSelector = topicSelector;
this.tablesChangedHandler = tablesChangedHandler != null ? tablesChangedHandler : (ids)->{};
this.tables = tables != null ? tables : new Tables();
this.ddlParser = new MySqlDdlParser(false); // don't include views
}
public void updateTableCommand(Event event, SourceInfo source, Consumer<SourceRecord> recorder) {
QueryEventData command = event.getData();
String ddlStatements = command.getSql();
try {
this.ddlParser.parse(ddlStatements, tables);
} catch ( ParsingException e) {
} catch (ParsingException e) {
logger.error("Error parsing DDL statement and updating tables", e);
} finally {
// Figure out what changed ...
Set<TableId> changes = tables.drainChanges();
changes.forEach(tableId->{
Table table = tables.forTable(tableId);
if ( table == null ) { // removed
tableSchemaByTableName.remove(tableId.table());
} else {
TableSchema schema = schemaBuilder.create(table, false);
tableSchemaByTableName.put(tableId.table(), schema);
}
});
tablesChangedHandler.accept(changes); // notify
// Write the DDL statements to a source record on the topic named for the server. This way we'll record
// all DDL statements for all databases hosted by that server ...
String databaseName = command.getDatabase();
String topic = topicSelector.getTopic(source.serverName());
Integer partition = null;
SourceRecord record = new SourceRecord(source.partition(), source.offset(), topic, partition,
Schema.STRING_SCHEMA, databaseName, Schema.STRING_SCHEMA, ddlStatements);
recorder.accept(record);
}
// Figure out what changed ...
Set<TableId> changes = tables.drainChanges();
changes.forEach(tableId -> {
Table table = tables.forTable(tableId);
if (table == null) { // removed
tableSchemaByTableName.remove(tableId.table());
} else {
TableSchema schema = schemaBuilder.create(table, false);
tableSchemaByTableName.put(tableId.table(), schema);
}
});
}
/**
@ -102,9 +108,10 @@ public void updateTableMetadata(Event event, SourceInfo source, Consumer<SourceR
long tableNumber = metadata.getTableId();
if (!convertersByTableId.containsKey(tableNumber)) {
// We haven't seen this table ID, so we need to rebuild our converter functions ...
String serverName = source.serverName();
String databaseName = metadata.getDatabase();
String tableName = metadata.getTable();
String topicName = topicSelector.getTopic(databaseName, tableName);
String topicName = topicSelector.getTopic(serverName, databaseName, tableName);
// Just get the current schema, which should be up-to-date ...
TableSchema tableSchema = tableSchemaByTableName.get(tableName);
@ -115,44 +122,50 @@ public void updateTableMetadata(Event event, SourceInfo source, Consumer<SourceR
public String topic() {
return topicName;
}
@Override
public Integer partition() {
return null;
}
@Override
public Schema keySchema() {
return tableSchema.keySchema();
}
@Override
public Schema valueSchema() {
return tableSchema.valueSchema();
}
@Override
public Object createKey(Serializable[] row, BitSet includedColumns) {
// assume all columns in the table are included ...
return tableSchema.keyFromColumnData(row);
}
@Override
public Struct inserted(Serializable[] row, BitSet includedColumns) {
// assume all columns in the table are included ...
return tableSchema.valueFromColumnData(row);
}
@Override
public Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before,
BitSet includedColumnsBeforeUpdate) {
// assume all columns in the table are included, and we'll write out only the updates ...
// assume all columns in the table are included, and we'll write out only the after state ...
return tableSchema.valueFromColumnData(after);
}
@Override
public Struct deleted(Serializable[] deleted, BitSet includedColumns) {
// TODO: Should we write out the old values or null?
// assume all columns in the table are included ...
// We current write out null to signal that the row was removed ...
return null; // tableSchema.valueFromColumnData(row);
}
};
convertersByTableId.put(tableNumber, converter);
Long previousTableNumber = tableNumbersByTableName.put(tableName, tableNumber);
if ( previousTableNumber != null ) {
if (previousTableNumber != null) {
convertersByTableId.remove(previousTableNumber);
}
}
@ -168,9 +181,9 @@ public void handleInsert(Event event, SourceInfo source, Consumer<SourceRecord>
for (int row = 0; row <= source.eventRowNumber(); ++row) {
Serializable[] values = write.getRows().get(row);
Schema keySchema = converter.keySchema();
Object key = converter.createKey(values,includedColumns);
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.inserted(values,includedColumns);
Struct value = converter.inserted(values, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
@ -197,9 +210,9 @@ public void handleUpdate(Event event, SourceInfo source, Consumer<SourceRecord>
Serializable[] before = changes.getKey();
Serializable[] after = changes.getValue();
Schema keySchema = converter.keySchema();
Object key = converter.createKey(after,includedColumns);
Object key = converter.createKey(after, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.updated(before,includedColumnsBefore, after,includedColumns);
Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
@ -216,9 +229,9 @@ public void handleDelete(Event event, SourceInfo source, Consumer<SourceRecord>
for (int row = 0; row <= source.eventRowNumber(); ++row) {
Serializable[] values = deleted.getRows().get(row);
Schema keySchema = converter.keySchema();
Object key = converter.createKey(values,includedColumns);
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.inserted(values,includedColumns);
Struct value = converter.inserted(values, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
@ -227,7 +240,7 @@ public void handleDelete(Event event, SourceInfo source, Consumer<SourceRecord>
protected static interface Converter {
String topic();
Integer partition();
Schema keySchema();
@ -238,7 +251,7 @@ protected static interface Converter {
Struct inserted(Serializable[] row, BitSet includedColumns);
Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before, BitSet includedColumnsBeforeUpdate );
Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before, BitSet includedColumnsBeforeUpdate);
Struct deleted(Serializable[] deleted, BitSet includedColumns);
}

View File

@ -0,0 +1,78 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.source;
import io.debezium.annotation.ThreadSafe;
/**
* A function that determines the name of topics for data and metadata.
*
* @author Randall Hauch
*/
@ThreadSafe
public interface TopicSelector {
/**
* Get the default topic selector logic, which uses a '.' delimiter character when needed.
*
* @return the topic selector; never null
*/
static TopicSelector defaultSelector() {
return defaultSelector(".");
}
/**
* Get the default topic selector logic, which uses the supplied delimiter character when needed.
*
* @param delimiter the string delineating the server, database, and table names; may not be null
* @return the topic selector; never null
*/
static TopicSelector defaultSelector(String delimiter) {
return new TopicSelector() {
/**
* Get the name of the topic for the given server, database, and table names. This method returns
* "{@code <serverName>}".
*
* @param serverName the name of the database server; may not be null
* @return the topic name; never null
*/
@Override
public String getTopic(String serverName) {
return serverName;
}
/**
* Get the name of the topic for the given server name. This method returns
* "{@code <serverName>.<databaseName>.<tableName>}".
*
* @param serverName the name of the database server; may not be null
* @param databaseName the name of the database; may not be null
* @param tableName the name of the table; may not be null
* @return the topic name; never null
*/
@Override
public String getTopic(String serverName, String databaseName, String tableName) {
return String.join(delimiter, serverName, databaseName, tableName);
}
};
}
/**
* Get the name of the topic for the given server name.
*
* @param serverName the name of the database server; may not be null
* @param databaseName the name of the database; may not be null
* @param tableName the name of the table; may not be null
* @return the topic name; never null
*/
String getTopic(String serverName, String databaseName, String tableName);
/**
* Get the name of the topic for the given server, database, and table names.
*
* @param serverName the name of the database server; may not be null
* @return the topic name; never null
*/
String getTopic(String serverName);
}

View File

@ -11,21 +11,19 @@
import org.junit.Ignore;
import org.junit.Test;
import io.debezium.jdbc.TestDatabase;
public class ConnectionIT {
@Ignore
@Test
public void shouldConnectToDefaulDatabase() throws SQLException {
try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("mysql"));) {
try (MySQLConnection conn = MySQLConnection.forTestDatabase("mysql");) {
conn.connect();
}
}
@Test
public void shouldDoStuffWithDatabase() throws SQLException {
try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("readbinlog_test"));) {
try (MySQLConnection conn = MySQLConnection.forTestDatabase("readbinlog_test");) {
conn.connect();
// Set up the table as one transaction and wait to see the events ...
conn.execute("DROP TABLE IF EXISTS person",
@ -46,7 +44,7 @@ public void shouldDoStuffWithDatabase() throws SQLException {
@Ignore
@Test
public void shouldConnectToEmptyDatabase() throws SQLException {
try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("emptydb"));) {
try (MySQLConnection conn = MySQLConnection.forTestDatabase("emptydb");) {
conn.connect();
}
}

View File

@ -6,14 +6,36 @@
package io.debezium.mysql;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
/**
* A utility for working with MySQL connections.
* A utility for integration test cases to connect the MySQL server running in the Docker container created by this module's
* build.
*
* @author Randall Hauch
*/
public class MySQLConnection extends JdbcConnection {
/**
* Obtain a connection instance to the named test database.
*
* @param databaseName the name of the test database
* @return the MySQLConnection instance; never null
*/
public static MySQLConnection forTestDatabase(String databaseName) {
return new MySQLConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDatabase(databaseName)
.build());
}
protected static void addDefaults(Configuration.Builder builder) {
builder.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 3306)
.withDefault(JdbcConfiguration.USER, "mysql")
.withDefault(JdbcConfiguration.PASSWORD, "mysqlpw");
}
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory("jdbc:mysql://${hostname}:${port}/${dbname}");
/**
@ -22,7 +44,7 @@ public class MySQLConnection extends JdbcConnection {
* @param config the configuration; may not be null
*/
public MySQLConnection(Configuration config) {
super(config, FACTORY);
super(config, FACTORY, null, MySQLConnection::addDefaults);
}
/**
@ -33,6 +55,6 @@ public MySQLConnection(Configuration config) {
* @param initialOperations the initial operations that should be run on each new connection; may be null
*/
public MySQLConnection(Configuration config, Operations initialOperations) {
super(config, FACTORY, initialOperations);
super(config, FACTORY, initialOperations, MySQLConnection::addDefaults);
}
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
package io.debezium.mysql.source;
import java.sql.SQLException;
import java.sql.Types;
@ -12,7 +12,6 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.jdbc.TestDatabase;
import io.debezium.mysql.MySQLConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
@ -27,7 +26,7 @@ public class MetadataIT {
*/
@Test
public void shouldLoadMetadataViaJdbc() throws SQLException {
try (MySQLConnection conn = new MySQLConnection(TestDatabase.testConfig("readbinlog_test"));) {
try (MySQLConnection conn = MySQLConnection.forTestDatabase("readbinlog_test");) {
conn.connect();
// Set up the table as one transaction and wait to see the events ...
conn.execute("DROP TABLE IF EXISTS person",

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.mysql.ingest;
package io.debezium.mysql.source;
import static org.junit.Assert.fail;
@ -45,7 +45,6 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TestDatabase;
import io.debezium.mysql.MySQLConnection;
public class ReadBinLogIT {
@ -59,7 +58,6 @@ private static final class AnyValue implements Serializable {
private static final Serializable ANY_OBJECT = new AnyValue();
private JdbcConfiguration config;
private EventQueue counters;
private BinaryLogClient client;
private MySQLConnection conn;
@ -69,12 +67,13 @@ private static final class AnyValue implements Serializable {
public void beforeEach() throws TimeoutException, IOException, SQLException, InterruptedException {
events.clear();
config = TestDatabase.buildTestConfig().withDatabase("readbinlog_test").build();
// Connect the normal SQL client ...
conn = new MySQLConnection(config);
conn = MySQLConnection.forTestDatabase("readbinlog_test");
conn.connect();
// Get the configuration that we used ...
JdbcConfiguration config = conn.config();
// Connect the bin log client ...
counters = new EventQueue(DEFAULT_TIMEOUT, this::logConsumedEvent, this::logIgnoredEvent);
client = new BinaryLogClient(config.getHostname(), config.getPort(), "replicator", "replpass");

View File

@ -3,27 +3,24 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.ingest.postgresql;
package io.debezium.postgresql;
import java.sql.SQLException;
import org.junit.Test;
import io.debezium.ingest.postgres.PostgresConnection;
import io.debezium.jdbc.TestDatabase;
public class ConnectionIT {
@Test
public void shouldConnectToDefaulDatabase() throws SQLException {
try (PostgresConnection conn = new PostgresConnection( TestDatabase.testConfig("postgres") );) {
try (PostgresConnection conn = PostgresConnection.forTestDatabase("postgres");) {
conn.connect();
}
}
@Test
public void shouldConnectToEmptyDatabase() throws SQLException {
try (PostgresConnection conn = new PostgresConnection( TestDatabase.testConfig("emptydb") );) {
try (PostgresConnection conn = PostgresConnection.forTestDatabase("emptydb");) {
conn.connect();
}
}

View File

@ -3,17 +3,39 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.ingest.postgres;
package io.debezium.postgresql;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
/**
* A utility for working with MySQL connections.
* A utility for integration test cases to connect the PostgreSQL server running in the Docker container created by this
* module's build.
*
* @author Randall Hauch
*/
public class PostgresConnection extends JdbcConnection {
/**
* Obtain a connection instance to the named test database.
*
* @param databaseName the name of the test database
* @return the PostgresConnection instance; never null
*/
public static PostgresConnection forTestDatabase(String databaseName) {
return new PostgresConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDatabase(databaseName)
.build());
}
protected static void addDefaults(Configuration.Builder builder) {
builder.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 5432)
.withDefault(JdbcConfiguration.USER, "postgres")
.withDefault(JdbcConfiguration.PASSWORD, "postgres");
}
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory("jdbc:postgresql://${hostname}:${port}/${dbname}");
/**