[DBZ-342] fix broken MySQL data type "TIME" handling

This commit is contained in:
rkerner 2017-11-29 20:34:12 +01:00
parent 4309164f27
commit c7ac481c43
25 changed files with 787 additions and 233 deletions

View File

@ -21,6 +21,7 @@
import io.debezium.config.Field.ValidationOutput;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
@ -29,66 +30,10 @@
*/
public class MySqlConnectorConfig {
/**
* The set of predefined TemporalPrecisionMode options or aliases.
*/
public static enum TemporalPrecisionMode implements EnumeratedValue {
/**
* Represent time and date values based upon the resolution in the database, using {@link io.debezium.time} semantic
* types.
*/
ADAPTIVE("adaptive"),
/**
* Represent time and date values using Kafka Connect {@link org.apache.kafka.connect.data} logical types, which always
* have millisecond precision.
*/
CONNECT("connect");
private final String value;
private TemporalPrecisionMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static TemporalPrecisionMode parse(String value) {
if (value == null) return null;
value = value.trim();
for (TemporalPrecisionMode option : TemporalPrecisionMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static TemporalPrecisionMode parse(String value, String defaultValue) {
TemporalPrecisionMode mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}
/**
* The set of predefined DecimalHandlingMode options or aliases.
*/
public static enum DecimalHandlingMode implements EnumeratedValue {
public enum DecimalHandlingMode implements EnumeratedValue {
/**
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise {@link BigDecimal} values, which are
* represented in change events in a binary form. This is precise but difficult to use.
@ -753,13 +698,14 @@ public static EventDeserializationFailureHandlingMode parse(String value) {
public static final Field TIME_PRECISION_MODE = Field.create("time.precision.mode")
.withDisplayName("Time Precision")
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE)
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Time, date, and timestamps can be represented with different kinds of precisions, including:"
+ "'adaptive' (the default) bases the precision of time, date, and timestamp values on the database column's precision; "
+ "'adaptive_time_microseconds' (the default) like 'adaptive' mode, but TIME fields always use microseconds precision;"
+ "'adaptive' (deprecated) bases the precision of time, date, and timestamp values on the database column's precision; "
+ "'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, "
+ "which uses millisecond precision regardless of the database columns' precision .");
+ "which uses millisecond precision regardless of the database columns' precision.");
public static final Field DECIMAL_HANDLING_MODE = Field.create("decimal.handling.mode")
.withDisplayName("Decimal Handling")

View File

@ -19,11 +19,11 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.TemporalPrecisionMode;
import io.debezium.connector.mysql.MySqlSystemVariables.Scope;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
@ -92,14 +92,13 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
// Use MySQL-specific converters and schemas for values ...
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE);
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr);
boolean adaptiveTimePrecision = TemporalPrecisionMode.ADAPTIVE.equals(timePrecisionMode);
String decimalHandlingModeStr = config.getString(MySqlConnectorConfig.DECIMAL_HANDLING_MODE);
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode();
String bigIntUnsignedHandlingModeStr = config.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode = BigIntUnsignedHandlingMode.parse(bigIntUnsignedHandlingModeStr);
BigIntUnsignedMode bigIntUnsignedMode = bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, adaptiveTimePrecision, bigIntUnsignedMode);
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode);
this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate);
// Set up the server name and schema prefix ...

View File

@ -13,6 +13,7 @@
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Types;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoField;
@ -35,6 +36,7 @@
import io.debezium.annotation.Immutable;
import io.debezium.data.Json;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.Year;
@ -112,14 +114,12 @@ protected static int adjustYear(int year) {
*
* @param decimalMode how {@code DECIMAL} and {@code NUMERIC} values should be treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.DecimalMode#PRECISE} is to be used
* @param adaptiveTimePrecision {@code true} if the time, date, and timestamp values should be based upon the precision of the
* database columns using {@link io.debezium.time} semantic types, or {@code false} if they should be fixed to
* millisecond precision using Kafka Connect {@link org.apache.kafka.connect.data} logical types.
* @param temporalPrecisionMode temporal precision mode based on {@link io.debezium.jdbc.TemporalPrecisionMode}
* @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used
*/
public MySqlValueConverters(DecimalMode decimalMode, boolean adaptiveTimePrecision, BigIntUnsignedMode bigIntUnsignedMode) {
this(decimalMode, adaptiveTimePrecision, ZoneOffset.UTC, bigIntUnsignedMode);
public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, BigIntUnsignedMode bigIntUnsignedMode) {
this(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, bigIntUnsignedMode);
}
/**
@ -129,16 +129,14 @@ public MySqlValueConverters(DecimalMode decimalMode, boolean adaptiveTimePrecisi
*
* @param decimalMode how {@code DECIMAL} and {@code NUMERIC} values should be treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.DecimalMode#PRECISE} is to be used
* @param adaptiveTimePrecision {@code true} if the time, date, and timestamp values should be based upon the precision of the
* database columns using {@link io.debezium.time} semantic types, or {@code false} if they should be fixed to
* millisecond precision using Kafka Connect {@link org.apache.kafka.connect.data} logical types.
* @param temporalPrecisionMode temporal precision mode based on {@link io.debezium.jdbc.TemporalPrecisionMode}
* @param defaultOffset the zone offset that is to be used when converting non-timezone related values to values that do
* have timezones; may be null if UTC is to be used
* @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used
*/
public MySqlValueConverters(DecimalMode decimalMode, boolean adaptiveTimePrecision, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode) {
super(decimalMode, adaptiveTimePrecision, defaultOffset, MySqlValueConverters::adjustTemporal, bigIntUnsignedMode);
public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode) {
super(decimalMode, temporalPrecisionMode, defaultOffset, MySqlValueConverters::adjustTemporal, bigIntUnsignedMode);
}
@Override
@ -259,6 +257,9 @@ public ValueConverter converter(Column column, Field fieldDefn) {
}
logger.warn("Using UTF-8 charset by default for column without charset: {}", column);
return (data) -> convertString(column, fieldDefn, StandardCharsets.UTF_8, data);
case Types.TIME:
if (adaptiveTimeMicrosecondsPrecisionMode)
return data -> convertDurationToMicroseconds(column, fieldDefn, data);
default:
break;
}
@ -713,4 +714,35 @@ else if (data instanceof Number) {
return convertNumeric(column, fieldDefn, data);
}
}
/**
* Converts a value object for an expected type of {@link java.time.Duration} to {@link Long} values that represents
* the time in microseconds.
* <p>
* Per the JDBC specification, databases should return {@link java.sql.Time} instances, but that's not working
* because it can only handle Daytime 00:00:00-23:59:59. We use {@link java.time.Duration} instead that can handle
* the range of -838:59:59.000000 to 838:59:59.000000 of a MySQL TIME type and transfer data as signed INT64 which
* reflects the DB value converted to microseconds.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link java.time.Duration} type; never null
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertDurationToMicroseconds(Column column, Field fieldDefn, Object data) {
if (data == null) {
data = fieldDefn.schema().defaultValue();
}
if (data == null) {
if (column.isOptional()) return null;
return 0;
}
try {
if (data instanceof Duration) return ((Duration) data).toNanos() / 1_000;
} catch (IllegalArgumentException e) {
return handleUnknownData(column, fieldDefn, data);
}
return handleUnknownData(column, fieldDefn, data);
}
}

View File

@ -15,6 +15,7 @@
import java.time.Year;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.Duration;
import java.util.Calendar;
import java.util.Map;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@ -231,6 +232,9 @@ protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws
}
}
private static final int MASK_10_BITS = (1 << 10) - 1;
private static final int MASK_6_BITS = (1 << 6) - 1;
/**
* Converts a MySQL string to a {@code byte[]}.
*
@ -282,7 +286,7 @@ protected static Serializable deserializeDate(ByteArrayInputStream inputStream)
}
/**
* Converts a MySQL {@code TIME} value <em>without fractional seconds</em> to a {@link LocalTime}.
* Converts a MySQL {@code TIME} value <em>without fractional seconds</em> to a {@link java.time.Duration}.
*
* @param inputStream the binary stream containing the raw binlog event data for the value
* @return the {@link LocalTime} object
@ -295,16 +299,15 @@ protected static Serializable deserializeTime(ByteArrayInputStream inputStream)
int hours = split[2];
int minutes = split[1];
int seconds = split[0];
int nanoOfSecond = 0; // This version does not support fractional seconds
return LocalTime.of(hours, minutes, seconds, nanoOfSecond);
return Duration.ofHours(hours).plusMinutes(minutes).plusSeconds(seconds);
}
/**
* Converts a MySQL {@code TIME} value <em>with fractional seconds</em> to a {@link LocalTime}.
* Converts a MySQL {@code TIME} value <em>with fractional seconds</em> to a {@link java.time.Duration}.
*
* @param meta the {@code meta} value containing the fractional second precision, or {@code fsp}
* @param inputStream the binary stream containing the raw binlog event data for the value
* @return the {@link LocalTime} object
* @return the {@link java.time.Duration} object
* @throws IOException if there is an error reading from the binlog event data
*/
protected static Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
@ -322,11 +325,31 @@ protected static Serializable deserializeTimeV2(int meta, ByteArrayInputStream i
* + fractional-seconds storage (size depends on meta)
*/
long time = bigEndianLong(inputStream.read(3), 0, 3);
int hour = bitSlice(time, 2, 10, 24);
int minute = bitSlice(time, 12, 6, 24);
int second = bitSlice(time, 18, 6, 24);
int nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream);
return LocalTime.of(hour, minute, second, nanoSeconds);
boolean is_negative = bitSlice(time, 0, 1, 24) == 0;
int hours = bitSlice(time, 2, 10, 24);
int minutes = bitSlice(time, 12, 6, 24);
int seconds = bitSlice(time, 18, 6, 24);
int nanoSeconds;
if (is_negative) { // mysql binary arithmetic for negative encoded values
hours = ~hours & MASK_10_BITS;
hours = hours & ~(1 << 10); // unset sign bit
minutes = ~minutes & MASK_6_BITS;
minutes = minutes & ~(1 << 6); // unset sign bit
seconds = ~seconds & MASK_6_BITS;
seconds = seconds & ~(1 << 6); // unset sign bit
nanoSeconds = deserializeFractionalSecondsInNanosNegative(meta, inputStream);
if (nanoSeconds == 0 && seconds < 59) { // weird java Duration behavior
++seconds;
}
hours = -hours;
minutes = -minutes;
seconds = -seconds;
nanoSeconds = -nanoSeconds;
}
else {
nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream);
}
return Duration.ofHours(hours).plusMinutes(minutes).plusSeconds(seconds).plusNanos(nanoSeconds);
}
/**
@ -334,7 +357,7 @@ protected static Serializable deserializeTimeV2(int meta, ByteArrayInputStream i
* <p>
* This method treats all <a href="http://dev.mysql.com/doc/refman/5.7/en/date-and-time-types.html">zero values</a>
* for {@code DATETIME} columns as NULL, since they cannot be accurately represented as valid {@link LocalDateTime} objects.
*
*
* @param inputStream the binary stream containing the raw binlog event data for the value
* @return the {@link LocalDateTime} object
* @throws IOException if there is an error reading from the binlog event data
@ -514,6 +537,34 @@ protected static int bitSlice(long value, int bitOffset, int numberOfBits, int p
* @throws IOException if there is an error reading from the binlog event data
*/
protected static int deserializeFractionalSecondsInNanos(int fsp, ByteArrayInputStream inputStream) throws IOException {
// Calculate the number of bytes to read, which is
// '1' when fsp=(1,2) -- 7
// '2' when fsp=(3,4) and -- 12
// '3' when fsp=(5,6) -- 21
int length = (fsp + 1) / 2;
if (length > 0) {
long fraction = bigEndianLong(inputStream.read(length), 0, length);
// Convert the fractional value (which has extra trailing digit for fsp=1,3, and 5) to nanoseconds ...
return (int) (fraction / (0.0000001 * Math.pow(100, length - 1)));
}
return 0;
}
/**
* Read the binary input stream to obtain the number of nanoseconds given the <em>fractional seconds precision</em>, or
* <em>fsp</em>.
* <p>
* We can't use/access the {@code deserializeFractionalSeconds} method in the {@link AbstractRowsEventDataDeserializer} class,
* so we replicate it here with modifications to support nanoseconds rather than microseconds and negative values.
* Note the original is licensed under the same Apache Software License 2.0 as Debezium.
*
* @param fsp the fractional seconds precision describing the number of digits precision used to store the fractional seconds
* (e.g., 1 for storing tenths of a second, 2 for storing hundredths, 3 for storing milliseconds, etc.)
* @param inputStream the binary data stream
* @return the number of nanoseconds
* @throws IOException if there is an error reading from the binlog event data
*/
protected static int deserializeFractionalSecondsInNanosNegative(int fsp, ByteArrayInputStream inputStream) throws IOException {
// Calculate the number of bytes to read, which is
// '1' when fsp=(1,2)
// '2' when fsp=(3,4) and
@ -521,6 +572,22 @@ protected static int deserializeFractionalSecondsInNanos(int fsp, ByteArrayInput
int length = (fsp + 1) / 2;
if (length > 0) {
long fraction = bigEndianLong(inputStream.read(length), 0, length);
int maskBits = 0;
switch (length) { // mask bits according to field precision
case 1:
maskBits = 8;
break;
case 2:
maskBits = 15;
break;
case 3:
maskBits = 20;
break;
default:
break;
}
fraction = ~fraction & ((1 << maskBits) - 1) ;
fraction = (fraction & ~(1 << maskBits)) + 1; // unset sign bit
// Convert the fractional value (which has extra trailing digit for fsp=1,3, and 5) to nanoseconds ...
return (int) (fraction / (0.0000001 * Math.pow(100, length - 1)));
}

View File

@ -5,11 +5,16 @@
*/
package io.debezium.connector.mysql;
import java.io.UnsupportedEncodingException;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -19,6 +24,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
@ -29,6 +36,7 @@
import io.debezium.function.Predicates;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnection.StatementFactory;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
@ -36,11 +44,16 @@
/**
* A component that performs a snapshot of a MySQL server, and records the schema changes in {@link MySqlSchema}.
*
*
* @author Randall Hauch
*/
public class SnapshotReader extends AbstractReader {
/**
* Used to parse values of TIME columns. Format: 000:00:00.000000.
*/
private static final Pattern TIME_FIELD_PATTERN = Pattern.compile("(\\-?[0-9]*):([0-9]*):([0-9]*)(\\.([0-9]*))?");
private boolean minimalBlocking = true;
private final boolean includeData;
private RecordRecorder recorder;
@ -49,7 +62,7 @@ public class SnapshotReader extends AbstractReader {
/**
* Create a snapshot reader.
*
*
* @param name the name of this reader; may not be null
* @param context the task context in which this reader is running; may not be null
*/
@ -65,7 +78,7 @@ public SnapshotReader(String name, MySqlTaskContext context) {
* releasing the read lock as early as possible. Although the snapshot process should obtain a consistent snapshot even
* when releasing the lock as early as possible, it may be desirable to explicitly hold onto the read lock until execution
* completes. In such cases, holding onto the lock will prevent all updates to the database during the snapshot process.
*
*
* @param minimalBlocking {@code true} if the lock is to be released as early as possible, or {@code false} if the lock
* is to be held for the entire {@link #execute() execution}
* @return this object for method chaining; never null
@ -78,7 +91,7 @@ public SnapshotReader useMinimalBlocking(boolean minimalBlocking) {
/**
* Set this reader's {@link #execute() execution} to produce a {@link io.debezium.data.Envelope.Operation#READ} event for each
* row.
*
*
* @return this object for method chaining; never null
*/
public SnapshotReader generateReadEvents() {
@ -89,7 +102,7 @@ public SnapshotReader generateReadEvents() {
/**
* Set this reader's {@link #execute() execution} to produce a {@link io.debezium.data.Envelope.Operation#CREATE} event for
* each row.
*
*
* @return this object for method chaining; never null
*/
public SnapshotReader generateInsertEvents() {
@ -128,6 +141,68 @@ protected void doCleanup() {
}
}
protected Object readField(ResultSet rs, int fieldNo, Column actualColumn) throws SQLException {
if(actualColumn.jdbcType() == Types.TIME) {
return readTimeField(rs, fieldNo);
}
else {
return rs.getObject(fieldNo);
}
}
/**
* As MySQL connector/J implementation is broken for MySQL type "TIME" we have to use a binary-ish workaround
*
* @see https://issues.jboss.org/browse/DBZ-342
*/
private Object readTimeField(ResultSet rs, int fieldNo) throws SQLException {
Blob b = rs.getBlob(fieldNo);
String timeString;
try {
timeString = new String(b.getBytes(1, (int) (b.length())), "UTF-8");
} catch (UnsupportedEncodingException e) {
logger.error("Could not read MySQL TIME value as UTF-8");
throw new RuntimeException(e);
}
Matcher matcher = TIME_FIELD_PATTERN.matcher(timeString);
if (!matcher.matches()) {
throw new RuntimeException("Unexpected format for TIME column: " + timeString);
}
long hours = Long.parseLong(matcher.group(1));
long minutes = Long.parseLong(matcher.group(2));
long seconds = Long.parseLong(matcher.group(3));
long nanoSeconds = 0;
String microSecondsString = matcher.group(5);
if (microSecondsString != null) {
nanoSeconds = Long.parseLong(rightPad(microSecondsString, 9, '0'));
}
if (hours >= 0) {
return Duration.ofHours(hours)
.plusMinutes(minutes)
.plusSeconds(seconds)
.plusNanos(nanoSeconds);
}
else {
return Duration.ofHours(hours)
.minusMinutes(minutes)
.minusSeconds(seconds)
.minusNanos(nanoSeconds);
}
}
private String rightPad(String input, int length, char c) {
char[] padded = new char[length];
System.arraycopy(input.toCharArray(), 0, padded, 0, input.length());
Arrays.fill(padded, input.length(), length, c);
return new String(padded);
}
/**
* Perform the snapshot using the same logic as the "mysqldump" utility.
*/
@ -446,7 +521,8 @@ protected void execute() {
final Object[] row = new Object[numColumns];
while (rs.next()) {
for (int i = 0, j = 1; i != numColumns; ++i, ++j) {
row[i] = rs.getObject(j);
Column actualColumn = table.columns().get(i);
row[i] = readField(rs, j, actualColumn);
}
recorder.recordRow(recordMaker, row, ts); // has no row number!
++rowNum;
@ -529,7 +605,7 @@ protected void execute() {
mysql.execute(sql.get());
metrics.completeSnapshot();
}
// -------
// STEP 10
// -------
@ -630,7 +706,7 @@ protected String quote(TableId id) {
* technique</a> for MySQL by creating the JDBC {@link Statement} with {@link ResultSet#TYPE_FORWARD_ONLY forward-only} cursor
* and {@link ResultSet#CONCUR_READ_ONLY read-only concurrency} flags, and with a {@link Integer#MIN_VALUE minimum value}
* {@link Statement#setFetchSize(int) fetch size hint}.
*
*
* @param connection the JDBC connection; may not be null
* @return the statement; never null
* @throws SQLException if there is a problem creating the statement
@ -685,7 +761,7 @@ private void logRolesForCurrentUser(JdbcConnection mysql) {
/**
* Utility method to replace the offset in the given record with the latest. This is used on the last record produced
* during the snapshot.
*
*
* @param record the record
* @return the updated record
*/

View File

@ -6,11 +6,14 @@
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -125,7 +128,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
assertThat(schemaChanges.recordCount()).isEqualTo(0);
// Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(4);
assertThat(store.collectionCount()).isEqualTo(5);
Collection products = store.collection(DATABASE.getDatabaseName(), "products");
assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0);
@ -177,8 +180,8 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
// Poll for records ...
// Testing.Print.enable();
int expectedSchemaChangeCount = 4 + 2; // 4 tables plus 2 alters
int expected = (9 + 9 + 4 + 5) + expectedSchemaChangeCount; // only the inserts for our 4 tables in this database, plus
int expectedSchemaChangeCount = 5 + 2; // 5 tables plus 2 alters
int expected = (9 + 9 + 4 + 5 + 1) + expectedSchemaChangeCount; // only the inserts for our 4 tables in this database, plus
// schema changes
int consumed = consumeAtLeast(expected);
assertThat(consumed).isGreaterThanOrEqualTo(expected);
@ -187,7 +190,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(schemaChanges.recordCount()).isEqualTo(expectedSchemaChangeCount);
// Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(4);
assertThat(store.collectionCount()).isEqualTo(5);
Collection products = store.collection(DATABASE.getDatabaseName(), "products");
assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0);
@ -264,4 +267,91 @@ public void shouldHandleTimestampTimezones() throws Exception {
String actualTimestampString = after.getString("c4");
assertThat(actualTimestampString).isEqualTo(expectedTimestampString);
}
@Test
@FixFor( "DBZ-342" )
public void shouldHandleMySQLTimeCorrectly() throws Exception {
final UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test")
.withDbHistoryPath(DB_HISTORY_PATH);
REGRESSION_DATABASE.createAndInitialize();
String tableName = "dbz_342_timetest";
config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, REGRESSION_DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_WHITELIST, REGRESSION_DATABASE.qualifiedTableName(tableName))
.build();
context = new MySqlTaskContext(config);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
reader = new BinlogReader("binlog", context);
// Start reading the binlog ...
reader.start();
int expectedChanges = 1; // only 1 insert
consumeAtLeast(expectedChanges);
// Check the records via the store ...
List<SourceRecord> sourceRecords = store.sourceRecords();
assertThat(sourceRecords.size()).isEqualTo(1);
SourceRecord sourceRecord = sourceRecords.get(0);
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
// '517:51:04.777'
long c1 = after.getInt64("c1");
Duration c1Time = Duration.ofNanos(c1 * 1_000);
Duration c1ExpectedTime = toDuration("PT517H51M4.78S");
assertEquals(c1ExpectedTime, c1Time);
assertEquals(c1ExpectedTime.toNanos(), c1Time.toNanos());
assertThat(c1Time.toNanos()).isEqualTo(1864264780000000L);
assertThat(c1Time).isEqualTo(Duration.ofHours(517).plusMinutes(51).plusSeconds(4).plusMillis(780));
// '-13:14:50'
long c2 = after.getInt64("c2");
Duration c2Time = Duration.ofNanos(c2 * 1_000);
Duration c2ExpectedTime = toDuration("-PT13H14M50S");
assertEquals(c2ExpectedTime, c2Time);
assertEquals(c2ExpectedTime.toNanos(), c2Time.toNanos());
assertThat(c2Time.toNanos()).isEqualTo(-47690000000000L);
assertTrue(c2Time.isNegative());
assertThat(c2Time).isEqualTo(Duration.ofHours(-13).minusMinutes(14).minusSeconds(50));
// '-733:00:00.0011'
long c3 = after.getInt64("c3");
Duration c3Time = Duration.ofNanos(c3 * 1_000);
Duration c3ExpectedTime = toDuration("-PT733H0M0.001S");
assertEquals(c3ExpectedTime, c3Time);
assertEquals(c3ExpectedTime.toNanos(), c3Time.toNanos());
assertThat(c3Time.toNanos()).isEqualTo(-2638800001000000L);
assertTrue(c3Time.isNegative());
assertThat(c3Time).isEqualTo(Duration.ofHours(-733).minusMillis(1));
// '-1:59:59.0011'
long c4 = after.getInt64("c4");
Duration c4Time = Duration.ofNanos(c4 * 1_000);
Duration c4ExpectedTime = toDuration("-PT1H59M59.001S");
assertEquals(c4ExpectedTime, c4Time);
assertEquals(c4ExpectedTime.toNanos(), c4Time.toNanos());
assertThat(c4Time.toNanos()).isEqualTo(-7199001000000L);
assertTrue(c4Time.isNegative());
assertThat(c4Time).isEqualTo(Duration.ofHours(-1).minusMinutes(59).minusSeconds(59).minusMillis(1));
// '-838:59:58.999999'
long c5 = after.getInt64("c5");
Duration c5Time = Duration.ofNanos(c5 * 1_000);
Duration c5ExpectedTime = toDuration("-PT838H59M58.999999S");
assertEquals(c5ExpectedTime, c5Time);
assertEquals(c5ExpectedTime.toNanos(), c5Time.toNanos());
assertThat(c5Time.toNanos()).isEqualTo(-3020398999999000L);
assertTrue(c5Time.isNegative());
assertThat(c5Time).isEqualTo(Duration.ofHours(-838).minusMinutes(59).minusSeconds(58).minusNanos(999999000));
}
private Duration toDuration(String duration) {
return Duration.parse(duration);
}
}

View File

@ -264,7 +264,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11 + 1); // 11 schema change records + 1 SET statement
SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11 + 1 + 2); // 11 schema change records + 1 SET statement
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(12);
assertThat(records.recordsForTopic(DATABASE.topicForTable("products")).size()).isEqualTo(9);
assertThat(records.recordsForTopic(DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
@ -696,12 +696,12 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(9 + 9 + 4 + 5);
SourceRecords records = consumeRecordsByTopic(9 + 9 + 4 + 5 + 1);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("products")).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("orders")).size()).isEqualTo(5);
assertThat(records.topics().size()).isEqualTo(4);
assertThat(records.topics().size()).isEqualTo(5);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);

View File

@ -6,6 +6,8 @@
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.math.BigDecimal;
@ -20,6 +22,7 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAdjuster;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
@ -31,10 +34,10 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.TemporalPrecisionMode;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
@ -84,12 +87,15 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int numCreateDatabase = 1;
int numCreateTables = 9;
int numDataRecords = 16;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
int numCreateTables = 11;
int numDataRecords = 20;
int numCreateDefiner = 1;
SourceRecords records =
consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords + numCreateDefiner);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.recordsForTopic(DATABASE.getServerName()).size())
.isEqualTo(numCreateDatabase + numCreateTables + numCreateDefiner);
assertThat(records.recordsForTopic(DATABASE.topicForTable("t1464075356413_testtable6")).size()).isEqualTo(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz84_integer_types_table")).size()).isEqualTo(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_85_fractest")).size()).isEqualTo(1);
@ -99,9 +105,11 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_123_bitvaluetest")).size()).isEqualTo(2);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_104_customers")).size()).isEqualTo(4);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_147_decimalvalues")).size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_342_timetest")).size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(numCreateTables + 1);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size())
.isEqualTo(numCreateDatabase + numCreateTables + numCreateDefiner);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).forEach(this::print);
@ -135,7 +143,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// c3 DATETIME(2),
// c4 TIMESTAMP(2)
//
// {"c1" : "16321", "c2" : "64264780", "c3" : "1410198664780", "c4" : "2014-09-08T17:51:04.78-05:00"}
// {"c1" : "16321", "c2" : "17:51:04.777", "c3" : "1410198664780", "c4" : "2014-09-08T17:51:04.78-05:00"}
// '2014-09-08'
Integer c1 = after.getInt32("c1"); // epoch days
@ -146,13 +154,14 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
assertThat(io.debezium.time.Date.toEpochDay(c1Date, ADJUSTER)).isEqualTo(c1);
// '17:51:04.777'
Integer c2 = after.getInt32("c2"); // milliseconds past midnight
LocalTime c2Time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(c2));
assertThat(c2Time.getHour()).isEqualTo(17);
assertThat(c2Time.getMinute()).isEqualTo(51);
assertThat(c2Time.getSecond()).isEqualTo(4);
assertThat(c2Time.getNano()).isEqualTo((int) TimeUnit.MILLISECONDS.toNanos(780));
assertThat(io.debezium.time.Time.toMilliOfDay(c2Time, ADJUSTER)).isEqualTo(c2);
Long c2 = after.getInt64("c2");
Duration c2Time = Duration.ofNanos(c2 * 1_000);
assertThat(c2Time.toHours()).isEqualTo(17);
assertThat(c2Time.toMinutes()).isEqualTo(1071);
assertThat(c2Time.getSeconds()).isEqualTo(64264);
assertThat(c2Time.getNano()).isEqualTo(780000000);
assertThat(c2Time.toNanos()).isEqualTo(64264780000000L);
assertThat(c2Time).isEqualTo(Duration.ofHours(17).plusMinutes(51).plusSeconds(4).plusMillis(780));
// '2014-09-08 17:51:04.777'
Long c3 = after.getInt64("c3"); // epoch millis
@ -199,13 +208,14 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
assertThat(after.getInt32("c1")).isNull(); // epoch days
// '00:00:00.000'
Integer c2 = after.getInt32("c2"); // milliseconds past midnight
LocalTime c2Time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(c2));
assertThat(c2Time.getHour()).isEqualTo(0);
assertThat(c2Time.getMinute() == 0 || c2Time.getMinute() == 1).isTrue();
assertThat(c2Time.getSecond()).isEqualTo(0);
Long c2 = after.getInt64("c2");
Duration c2Time = Duration.ofNanos(c2 * 1_000);
assertThat(c2Time.toHours()).isEqualTo(0);
assertThat(c2Time.toMinutes() == 1 || c2Time.toMinutes() == 0).isTrue();
assertThat(c2Time.getSeconds() == 0 || c2Time.getSeconds() == 60).isTrue();
assertThat(c2Time.getNano()).isEqualTo(0);
assertThat(io.debezium.time.Time.toMilliOfDay(c2Time, ADJUSTER)).isEqualTo(c2);
assertThat(c2Time.toNanos() == 0 || c2Time.toNanos() == 60000000000L).isTrue();
assertThat(c2Time.equals(Duration.ofSeconds(0)) || c2Time.equals(Duration.ofMinutes(1))).isTrue();
assertThat(after.getInt64("c3")).isNull(); // epoch millis
@ -262,6 +272,57 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
assertThat(decimalValue).isInstanceOf(BigDecimal.class);
BigDecimal bigValue = (BigDecimal) decimalValue;
assertThat(bigValue.doubleValue()).isEqualTo(12345.67, Delta.delta(0.01));
} else if (record.topic().endsWith("dbz_342_timetest")) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
// '517:51:04.777'
long c1 = after.getInt64("c1");
Duration c1Time = Duration.ofNanos(c1 * 1_000);
Duration c1ExpectedTime = toDuration("PT517H51M4.78S");
assertEquals(c1ExpectedTime, c1Time);
assertEquals(c1ExpectedTime.toNanos(), c1Time.toNanos());
assertThat(c1Time.toNanos()).isEqualTo(1864264780000000L);
assertThat(c1Time).isEqualTo(Duration.ofHours(517).plusMinutes(51).plusSeconds(4).plusMillis(780));
// '-13:14:50'
long c2 = after.getInt64("c2");
Duration c2Time = Duration.ofNanos(c2 * 1_000);
Duration c2ExpectedTime = toDuration("-PT13H14M50S");
assertEquals(c2ExpectedTime, c2Time);
assertEquals(c2ExpectedTime.toNanos(), c2Time.toNanos());
assertThat(c2Time.toNanos()).isEqualTo(-47690000000000L);
assertTrue(c2Time.isNegative());
assertThat(c2Time).isEqualTo(Duration.ofHours(-13).minusMinutes(14).minusSeconds(50));
// '-733:00:00.0011'
long c3 = after.getInt64("c3");
Duration c3Time = Duration.ofNanos(c3 * 1_000);
Duration c3ExpectedTime = toDuration("-PT733H0M0.001S");
assertEquals(c3ExpectedTime, c3Time);
assertEquals(c3ExpectedTime.toNanos(), c3Time.toNanos());
assertThat(c3Time.toNanos()).isEqualTo(-2638800001000000L);
assertTrue(c3Time.isNegative());
assertThat(c3Time).isEqualTo(Duration.ofHours(-733).minusMillis(1));
// '-1:59:59.0011'
long c4 = after.getInt64("c4");
Duration c4Time = Duration.ofNanos(c4 * 1_000);
Duration c4ExpectedTime = toDuration("-PT1H59M59.001S");
assertEquals(c4ExpectedTime, c4Time);
assertEquals(c4ExpectedTime.toNanos(), c4Time.toNanos());
assertThat(c4Time.toNanos()).isEqualTo(-7199001000000L);
assertTrue(c4Time.isNegative());
assertThat(c4Time).isEqualTo(Duration.ofHours(-1).minusMinutes(59).minusSeconds(59).minusMillis(1));
// '-838:59:58.999999'
long c5 = after.getInt64("c5");
Duration c5Time = Duration.ofNanos(c5 * 1_000);
Duration c5ExpectedTime = toDuration("-PT838H59M58.999999S");
assertEquals(c5ExpectedTime, c5Time);
assertEquals(c5ExpectedTime.toNanos(), c5Time.toNanos());
assertThat(c5Time.toNanos()).isEqualTo(-3020398999999000L);
assertTrue(c5Time.isNegative());
assertThat(c5Time).isEqualTo(Duration.ofHours(-838).minusMinutes(59).minusSeconds(58).minusNanos(999999000));
}
});
}
@ -284,12 +345,15 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int numCreateDatabase = 1;
int numCreateTables = 9;
int numDataRecords = 16;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
int numCreateTables = 11;
int numDataRecords = 20;
int numCreateDefiner = 1;
SourceRecords records =
consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords + numCreateDefiner);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.recordsForTopic(DATABASE.getServerName()).size())
.isEqualTo(numCreateDatabase + numCreateTables + numCreateDefiner);
assertThat(records.recordsForTopic(DATABASE.topicForTable("t1464075356413_testtable6")).size()).isEqualTo(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz84_integer_types_table")).size()).isEqualTo(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_85_fractest")).size()).isEqualTo(1);
@ -301,7 +365,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_147_decimalvalues")).size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size())
.isEqualTo(numCreateDatabase + numCreateTables + numCreateDefiner);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).forEach(this::print);
@ -478,14 +543,18 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int numTables = 10;
int numDataRecords = 19;
int numCreateDatabase = 1;
int numTables = 11;
int numDataRecords = 20;
int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
int numCreateDefiner = 1;
int numSetVariables = 1;
SourceRecords records = consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords);
SourceRecords records =
consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords + numCreateDefiner + numCreateDatabase);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numDdlRecords + numSetVariables);
assertThat(records.recordsForTopic(DATABASE.getServerName()).size())
.isEqualTo(numDdlRecords + numSetVariables);
assertThat(records.recordsForTopic(DATABASE.topicForTable("t1464075356413_testtable6")).size()).isEqualTo(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz84_integer_types_table")).size()).isEqualTo(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_85_fractest")).size()).isEqualTo(1);
@ -496,6 +565,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_104_customers")).size()).isEqualTo(4);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_147_decimalvalues")).size()).isEqualTo(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_195_numvalues")).size()).isEqualTo(3);
assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_342_timetest")).size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(numTables + 1);
assertThat(records.databaseNames().size()).isEqualTo(2);
assertThat(records.databaseNames()).containsOnly(DATABASE.getDatabaseName(), "");
@ -534,7 +604,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// c3 DATETIME(2),
// c4 TIMESTAMP(2)
//
// {"c1" : "16321", "c2" : "64264780", "c3" : "1410198664780", "c4" : "2014-09-08T17:51:04.78-05:00"}
// {"c1" : "16321", "c2" : "17:51:04.777", "c3" : "1410198664780", "c4" : "2014-09-08T17:51:04.78-05:00"}
// '2014-09-08'
Integer c1 = after.getInt32("c1"); // epoch days
@ -545,14 +615,14 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
assertThat(io.debezium.time.Date.toEpochDay(c1Date, ADJUSTER)).isEqualTo(c1);
// '17:51:04.777'
Integer c2 = after.getInt32("c2"); // milliseconds past midnight
LocalTime c2Time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(c2));
assertThat(c2Time.getHour()).isEqualTo(17);
assertThat(c2Time.getMinute()).isEqualTo(51);
assertThat(c2Time.getSecond()).isEqualTo(4);
assertThat(c2Time.getNano()).isEqualTo(0); // What!?!? The MySQL Connect/J driver indeed returns 0 for fractional
// part
assertThat(io.debezium.time.Time.toMilliOfDay(c2Time, ADJUSTER)).isEqualTo(c2);
Long c2 = after.getInt64("c2");
Duration c2Time = Duration.ofNanos(c2 * 1_000);
assertThat(c2Time.toHours()).isEqualTo(17);
assertThat(c2Time.toMinutes()).isEqualTo(1071);
assertThat(c2Time.getSeconds()).isEqualTo(64264);
assertThat(c2Time.getNano()).isEqualTo(780000000);
assertThat(c2Time.toNanos()).isEqualTo(64264780000000L);
assertThat(c2Time).isEqualTo(Duration.ofHours(17).plusMinutes(51).plusSeconds(4).plusMillis(780));
// '2014-09-08 17:51:04.777'
Long c3 = after.getInt64("c3"); // epoch millis
@ -633,6 +703,57 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
} else {
assertThat(intValue.intValue()).isEqualTo(0);
}
} else if (record.topic().endsWith("dbz_342_timetest")) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
// '517:51:04.777'
long c1 = after.getInt64("c1");
Duration c1Time = Duration.ofNanos(c1 * 1_000);
Duration c1ExpectedTime = toDuration("PT517H51M4.78S");
assertEquals(c1ExpectedTime, c1Time);
assertEquals(c1ExpectedTime.toNanos(), c1Time.toNanos());
assertThat(c1Time.toNanos()).isEqualTo(1864264780000000L);
assertThat(c1Time).isEqualTo(Duration.ofHours(517).plusMinutes(51).plusSeconds(4).plusMillis(780));
// '-13:14:50'
long c2 = after.getInt64("c2");
Duration c2Time = Duration.ofNanos(c2 * 1_000);
Duration c2ExpectedTime = toDuration("-PT13H14M50S");
assertEquals(c2ExpectedTime, c2Time);
assertEquals(c2ExpectedTime.toNanos(), c2Time.toNanos());
assertThat(c2Time.toNanos()).isEqualTo(-47690000000000L);
assertTrue(c2Time.isNegative());
assertThat(c2Time).isEqualTo(Duration.ofHours(-13).minusMinutes(14).minusSeconds(50));
// '-733:00:00.0011'
long c3 = after.getInt64("c3");
Duration c3Time = Duration.ofNanos(c3 * 1_000);
Duration c3ExpectedTime = toDuration("-PT733H0M0.001S");
assertEquals(c3ExpectedTime, c3Time);
assertEquals(c3ExpectedTime.toNanos(), c3Time.toNanos());
assertThat(c3Time.toNanos()).isEqualTo(-2638800001000000L);
assertTrue(c3Time.isNegative());
assertThat(c3Time).isEqualTo(Duration.ofHours(-733).minusMillis(1));
// '-1:59:59.0011'
long c4 = after.getInt64("c4");
Duration c4Time = Duration.ofNanos(c4 * 1_000);
Duration c4ExpectedTime = toDuration("-PT1H59M59.001S");
assertEquals(c4ExpectedTime, c4Time);
assertEquals(c4ExpectedTime.toNanos(), c4Time.toNanos());
assertThat(c4Time.toNanos()).isEqualTo(-7199001000000L);
assertTrue(c4Time.isNegative());
assertThat(c4Time).isEqualTo(Duration.ofHours(-1).minusMinutes(59).minusSeconds(59).minusMillis(1));
// '-838:59:58.999999'
long c5 = after.getInt64("c5");
Duration c5Time = Duration.ofNanos(c5 * 1_000);
Duration c5ExpectedTime = toDuration("-PT838H59M58.999999S");
assertEquals(c5ExpectedTime, c5Time);
assertEquals(c5ExpectedTime.toNanos(), c5Time.toNanos());
assertThat(c5Time.toNanos()).isEqualTo(-3020398999999000L);
assertTrue(c5Time.isNegative());
assertThat(c5Time).isEqualTo(Duration.ofHours(-838).minusMinutes(59).minusSeconds(58).minusNanos(999999000));
}
});
}
@ -699,4 +820,8 @@ private void assertTimestamp(String c4) {
ZoneOffset expectedOffset = defaultZoneId.getRules().getOffset(expectedLocalDateTime);
assertThat(c4DateTime.getOffset()).isEqualTo(expectedOffset);
}
private Duration toDuration(String duration) {
return Duration.parse(duration);
}
}

View File

@ -9,10 +9,13 @@
import static org.junit.Assert.fail;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
@ -106,7 +109,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
assertThat(schemaChanges.recordCount()).isEqualTo(0);
// Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(4);
assertThat(store.collectionCount()).isEqualTo(5);
Collection products = store.collection(DATABASE.getDatabaseName(), "products");
assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0);
@ -143,6 +146,25 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest");
assertThat(timetest.numberOfCreates()).isEqualTo(1);
assertThat(timetest.numberOfUpdates()).isEqualTo(0);
assertThat(timetest.numberOfDeletes()).isEqualTo(0);
assertThat(timetest.numberOfReads()).isEqualTo(0);
assertThat(timetest.numberOfTombstones()).isEqualTo(0);
assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1);
final List<Struct> timerecords = new ArrayList<>();
timetest.forEach(val -> {
timerecords.add(((Struct) val.value()).getStruct("after"));
});
Struct after = timerecords.get(0);
assertThat(after.get("c1")).isEqualTo(toMicroSeconds("PT517H51M04.78S"));
assertThat(after.get("c2")).isEqualTo(toMicroSeconds("-PT13H14M50S"));
assertThat(after.get("c3")).isEqualTo(toMicroSeconds("-PT733H0M0.001S"));
assertThat(after.get("c4")).isEqualTo(toMicroSeconds("-PT1H59M59.001S"));
assertThat(after.get("c5")).isEqualTo(toMicroSeconds("-PT838H59M58.999999S"));
// Make sure the snapshot completed ...
if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ...
@ -185,7 +207,7 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
// Check the records via the store ...
assertThat(store.databases()).containsOnly(DATABASE.getDatabaseName(), OTHER_DATABASE.getDatabaseName()); // 2 databases
assertThat(store.collectionCount()).isEqualTo(8); // 2 databases
assertThat(store.collectionCount()).isEqualTo(9); // 2 databases
Collection products = store.collection(DATABASE.getDatabaseName(), "products");
assertThat(products.numberOfCreates()).isEqualTo(0);
@ -223,6 +245,25 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest");
assertThat(timetest.numberOfCreates()).isEqualTo(0);
assertThat(timetest.numberOfUpdates()).isEqualTo(0);
assertThat(timetest.numberOfDeletes()).isEqualTo(0);
assertThat(timetest.numberOfReads()).isEqualTo(1);
assertThat(timetest.numberOfTombstones()).isEqualTo(0);
assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1);
final List<Struct> timerecords = new ArrayList<>();
timetest.forEach(val -> {
timerecords.add(((Struct) val.value()).getStruct("after"));
});
Struct after = timerecords.get(0);
assertThat(after.get("c1")).isEqualTo(toMicroSeconds("PT517H51M04.78S"));
assertThat(after.get("c2")).isEqualTo(toMicroSeconds("-PT13H14M50S"));
assertThat(after.get("c3")).isEqualTo(toMicroSeconds("-PT733H0M0.001S"));
assertThat(after.get("c4")).isEqualTo(toMicroSeconds("-PT1H59M59.001S"));
assertThat(after.get("c5")).isEqualTo(toMicroSeconds("-PT838H59M58.999999S"));
// Make sure the snapshot completed ...
if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ...
@ -261,12 +302,12 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(records).isNull();
// There should be 11 schema changes plus 1 SET statement ...
assertThat(schemaChanges.recordCount()).isEqualTo(12);
assertThat(schemaChanges.recordCount()).isEqualTo(14);
assertThat(schemaChanges.databaseCount()).isEqualTo(2);
assertThat(schemaChanges.databases()).containsOnly(DATABASE.getDatabaseName(), "");
// Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(4);
assertThat(store.collectionCount()).isEqualTo(5);
Collection products = store.collection(DATABASE.getDatabaseName(), "products");
assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0);
@ -303,6 +344,25 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest");
assertThat(timetest.numberOfCreates()).isEqualTo(1);
assertThat(timetest.numberOfUpdates()).isEqualTo(0);
assertThat(timetest.numberOfDeletes()).isEqualTo(0);
assertThat(timetest.numberOfReads()).isEqualTo(0);
assertThat(timetest.numberOfTombstones()).isEqualTo(0);
assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1);
final List<Struct> timerecords = new ArrayList<>();
timetest.forEach(val -> {
timerecords.add(((Struct) val.value()).getStruct("after"));
});
Struct after = timerecords.get(0);
assertThat(after.get("c1")).isEqualTo(toMicroSeconds("PT517H51M04.78S"));
assertThat(after.get("c2")).isEqualTo(toMicroSeconds("-PT13H14M50S"));
assertThat(after.get("c3")).isEqualTo(toMicroSeconds("-PT733H0M0.001S"));
assertThat(after.get("c4")).isEqualTo(toMicroSeconds("-PT1H59M59.001S"));
assertThat(after.get("c5")).isEqualTo(toMicroSeconds("-PT838H59M58.999999S"));
// Make sure the snapshot completed ...
if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ...
@ -354,4 +414,8 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
fail("failed to complete the snapshot within 10 seconds");
}
}
private long toMicroSeconds(String duration) {
return Duration.parse(duration).toNanos() / 1_000;
}
}

View File

@ -79,3 +79,13 @@ INSERT INTO ids VALUES(1);
INSERT INTO ids2 VALUES(1);
DROP TEMPORARY TABLE ids;
DROP TEMPORARY TABLE ids2;
-- DBZ-342 handle TIME values that exceed the value range of java.sql.Time
CREATE TABLE dbz_342_timetest (
c1 TIME(2) PRIMARY KEY,
c2 TIME(0),
c3 TIME(3),
c4 TIME(3),
c5 TIME(6)
);
INSERT INTO dbz_342_timetest VALUES ('517:51:04.777', '-13:14:50', '-733:00:00.0011', '-1:59:59.0011', '-838:59:58.999999');

View File

@ -144,4 +144,14 @@ CREATE TABLE dbz_195_numvalues (
INSERT INTO dbz_195_numvalues VALUES (default,0);
INSERT INTO dbz_195_numvalues VALUES (default,-2147483648);
INSERT INTO dbz_195_numvalues VALUES (default,2147483647);
INSERT INTO dbz_195_numvalues VALUES (default,2147483647);
-- DBZ-342 handle TIME values that exceed the value range of java.sql.Time
CREATE TABLE dbz_342_timetest (
c1 TIME(2),
c2 TIME(0),
c3 TIME(3),
c4 TIME(3),
c5 TIME(6)
);
INSERT INTO dbz_342_timetest VALUES ('517:51:04.777', '-13:14:50', '-733:00:00.0011', '-1:59:59.0011', '-838:59:58.999999');

View File

@ -26,6 +26,7 @@
import io.debezium.connector.postgresql.connection.wal2json.Wal2JsonMessageDecoder;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
/**
* The configuration properties for the {@link PostgresConnector}
@ -34,67 +35,10 @@
*/
public class PostgresConnectorConfig {
/**
* The set of predefined TemporalPrecisionMode options or aliases.
*/
public enum TemporalPrecisionMode implements EnumeratedValue {
/**
* Represent time and date values based upon the resolution in the database, using {@link io.debezium.time} semantic
* types.
*/
ADAPTIVE("adaptive"),
/**
* Represent time and date values using Kafka Connect {@link org.apache.kafka.connect.data} logical types, which always
* have millisecond precision.
*/
CONNECT("connect");
private final String value;
TemporalPrecisionMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static TemporalPrecisionMode parse(String value) {
if (value == null) return null;
value = value.trim();
for (TemporalPrecisionMode option : TemporalPrecisionMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static TemporalPrecisionMode parse(String value, String defaultValue) {
TemporalPrecisionMode mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}
/**
* The set of predefined DecimalHandlingMode options or aliases.
*/
public static enum DecimalHandlingMode implements EnumeratedValue {
public enum DecimalHandlingMode implements EnumeratedValue {
/**
* Represent {@code DECIMAL} and {@code NUMERIC} values as precise {@link BigDecimal} values, which are
* represented in change events in a binary form. This is precise but difficult to use.
@ -653,6 +597,7 @@ public String getValue() {
.withImportance(Importance.MEDIUM)
.withDescription("Time, date, and timestamps can be represented with different kinds of precisions, including:"
+ "'adaptive' (the default) bases the precision of time, date, and timestamp values on the database column's precision; "
+ "'adaptive_time_microseconds' like 'adaptive' mode, but TIME fields always use microseconds precision;"
+ "'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, "
+ "which uses millisecond precision regardless of the database columns' precision .");
@ -697,7 +642,7 @@ public String getValue() {
private final Configuration config;
private final String serverName;
private final boolean adaptiveTimePrecision;
private final TemporalPrecisionMode temporalPrecisionMode;
private final DecimalMode decimalHandlingMode;
private final SnapshotMode snapshotMode;
@ -708,8 +653,7 @@ protected PostgresConnectorConfig(Configuration config) {
serverName = hostname() + ":" + port() + "/" + databaseName();
}
this.serverName = serverName;
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
this.adaptiveTimePrecision = TemporalPrecisionMode.ADAPTIVE == timePrecisionMode;
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
String decimalHandlingModeStr = config.getString(PostgresConnectorConfig.DECIMAL_HANDLING_MODE);
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
this.decimalHandlingMode = decimalHandlingMode.asDecimalMode();
@ -756,8 +700,8 @@ protected long pollIntervalMs() {
return config.getLong(POLL_INTERVAL_MS);
}
protected boolean adaptiveTimePrecision() {
return adaptiveTimePrecision;
protected TemporalPrecisionMode temporalPrecisionMode() {
return temporalPrecisionMode;
}
protected DecimalMode decimalHandlingMode() {

View File

@ -61,7 +61,7 @@ protected PostgresSchema(PostgresConnectorConfig config) {
this.filters = new Filters(config);
this.tables = new Tables();
PostgresValueConverter valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.adaptiveTimePrecision(),
PostgresValueConverter valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(),
ZoneOffset.UTC, null);
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate;
this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator);

View File

@ -35,6 +35,7 @@
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.geometry.Point;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.MicroDuration;
@ -64,8 +65,8 @@ public class PostgresValueConverter extends JdbcValueConverters {
*/
private static final int VARIABLE_SCALE_DECIMAL_LENGTH = 131089;
protected PostgresValueConverter(DecimalMode decimalMode, boolean adaptiveTimePrecision, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode) {
super(decimalMode, adaptiveTimePrecision, defaultOffset, null, bigIntUnsignedMode);
protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode) {
super(decimalMode, temporalPrecisionMode, defaultOffset, null, bigIntUnsignedMode);
}
@Override
@ -117,7 +118,7 @@ public SchemaBuilder schemaBuilder(Column column) {
case PgOid.BOOL_ARRAY:
return SchemaBuilder.array(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
case PgOid.DATE_ARRAY:
if (adaptiveTimePrecision) {
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
return SchemaBuilder.array(io.debezium.time.Date.builder().optional().build());
}
return SchemaBuilder.array(org.apache.kafka.connect.data.Date.builder().optional().build());

View File

@ -52,6 +52,7 @@
import io.debezium.relational.TableId;
import io.debezium.time.Date;
import io.debezium.time.MicroDuration;
import io.debezium.time.MicroTime;
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.ZonedTime;
@ -202,6 +203,22 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval));
}
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypesAdaptiveTimeMicroseconds() {
long expectedTs = NanoTimestamp.toEpochNanos(LocalDateTime.parse("2016-11-04T13:51:30"), null);
String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;
String expectedTtz = "11:51:30Z"; //time is stored with TZ, should be read back at GMT
double interval = MicroDuration.durationMicros(1, 2, 3, 4, 5, 0, PostgresValueConverter.DAYS_PER_MONTH_AVG);
return Arrays.asList(new SchemaAndValueField("ts", NanoTimestamp.builder().optional().build(), expectedTs),
new SchemaAndValueField("tz", ZonedTimestamp.builder().optional().build(), expectedTz),
new SchemaAndValueField("date", Date.builder().optional().build(), expectedDate),
new SchemaAndValueField("ti", MicroTime.builder().optional().build(), expectedTi),
new SchemaAndValueField("ttz", ZonedTime.builder().optional().build(), expectedTtz),
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval));
}
protected List<SchemaAndValueField> schemaAndValuesForMoneyTypes() {
return Collections.singletonList(new SchemaAndValueField("csh", Decimal.builder(0).optional().build(),
BigDecimal.valueOf(1234.11d)));
@ -234,6 +251,18 @@ protected Map<String, List<SchemaAndValueField>> schemaAndValuesByTableName() {
this::schemasAndValuesForTable));
}
protected Map<String, List<SchemaAndValueField>> schemaAndValuesByTableNameAdaptiveTimeMicroseconds() {
return ALL_STMTS.stream().collect(Collectors.toMap(AbstractRecordsProducerTest::tableNameFromInsertStmt,
this::schemasAndValuesForTableAdaptiveTimeMicroseconds));
}
protected List<SchemaAndValueField> schemasAndValuesForTableAdaptiveTimeMicroseconds(String insertTableStatement) {
if (insertTableStatement.equals(INSERT_DATE_TIME_TYPES_STMT)) {
return schemaAndValuesForDateTimeTypesAdaptiveTimeMicroseconds();
}
return schemasAndValuesForTable(insertTableStatement);
}
protected List<SchemaAndValueField> schemasAndValuesForTable(String insertTableStatement) {
switch (insertTableStatement) {
case INSERT_NUMERIC_TYPES_STMT:

View File

@ -46,6 +46,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.util.Strings;
/**
@ -145,7 +146,7 @@ public void shouldValidateConfiguration() throws Exception {
validateField(validatedConfig, PostgresConnectorConfig.COLUMN_BLACKLIST, null);
validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL);
validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, PostgresConnectorConfig.DEFAULT_SNAPSHOT_LOCK_TIMEOUT_MILLIS);
validateField(validatedConfig, PostgresConnectorConfig.TIME_PRECISION_MODE, PostgresConnectorConfig.TemporalPrecisionMode.ADAPTIVE);
validateField(validatedConfig, PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE);
validateField(validatedConfig, PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.PRECISE);
validateField(validatedConfig, PostgresConnectorConfig.SSL_SOCKET_FACTORY, null);
validateField(validatedConfig, PostgresConnectorConfig.TCP_KEEPALIVE, null);

View File

@ -18,6 +18,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.TemporalPrecisionMode;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
@ -152,4 +154,36 @@ private void assertReadRecord(SourceRecord record, Map<String, List<SchemaAndVal
assertNotNull("No expected values for " + tableName + " found", expectedValuesAndSchemasForTable);
assertRecordSchemaAndValues(expectedValuesAndSchemasForTable, record, Envelope.FieldName.AFTER);
}
@Test
@FixFor("DBZ-342")
public void shouldGenerateSnapshotsForDefaultDatatypesAdpativeMicroseconds() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.build());
context = new PostgresTaskContext(config, new PostgresSchema(config));
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER), false);
TestHelper.executeDDL("postgres_create_tables.ddl");
TestConsumer consumer = testConsumer(ALL_STMTS.size());
//insert data for each of different supported types
String statementsBuilder = ALL_STMTS.stream().collect(Collectors.joining(";" + System.lineSeparator())) + ";";
TestHelper.execute(statementsBuilder);
//then start the producer and validate all records are there
snapshotProducer.start(consumer);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
Map<String, List<SchemaAndValueField>> expectedValuesByTableName = super.schemaAndValuesByTableNameAdaptiveTimeMicroseconds();
consumer.process(record -> assertReadRecord(record, expectedValuesByTableName));
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffset(record, true, consumer.isEmpty());
}
}
}

View File

@ -341,7 +341,7 @@ public static interface CallPreparer {
* @see #execute(Operations)
*/
public JdbcConnection query(String query, ResultSetConsumer resultConsumer) throws SQLException {
return query(query,conn->conn.createStatement(),resultConsumer);
return query(query, Connection::createStatement, resultConsumer);
}
/**

View File

@ -62,17 +62,18 @@
@Immutable
public class JdbcValueConverters implements ValueConverterProvider {
public static enum DecimalMode {
public enum DecimalMode {
PRECISE, DOUBLE;
}
public static enum BigIntUnsignedMode {
public enum BigIntUnsignedMode {
PRECISE, LONG;
}
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final ZoneOffset defaultOffset;
protected final boolean adaptiveTimePrecision;
protected final boolean adaptiveTimePrecisionMode;
protected final boolean adaptiveTimeMicrosecondsPrecisionMode;
protected final DecimalMode decimalMode;
private final TemporalAdjuster adjuster;
protected final BigIntUnsignedMode bigIntUnsignedMode;
@ -83,7 +84,7 @@ public static enum BigIntUnsignedMode {
* columns.
*/
public JdbcValueConverters() {
this(null, true, ZoneOffset.UTC, null, null);
this(null, TemporalPrecisionMode.ADAPTIVE, ZoneOffset.UTC, null, null);
}
/**
@ -93,9 +94,7 @@ public JdbcValueConverters() {
*
* @param decimalMode how {@code DECIMAL} and {@code NUMERIC} values should be treated; may be null if
* {@link DecimalMode#PRECISE} is to be used
* @param adaptiveTimePrecision {@code true} if the time, date, and timestamp values should be based upon the precision of the
* database columns using {@link io.debezium.time} semantic types, or {@code false} if they should be fixed to
* millisecond precision using Kafka Connect {@link org.apache.kafka.connect.data} logical types.
* @param temporalPrecisionMode temporal precision mode based on {@link io.debezium.jdbc.TemporalPrecisionMode}
* @param defaultOffset the zone offset that is to be used when converting non-timezone related values to values that do
* have timezones; may be null if UTC is to be used
* @param adjuster the optional component that adjusts the local date value before obtaining the epoch day; may be null if no
@ -103,10 +102,11 @@ public JdbcValueConverters() {
* @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if
* {@link BigIntUnsignedMode#PRECISE} is to be used
*/
public JdbcValueConverters(DecimalMode decimalMode, boolean adaptiveTimePrecision, ZoneOffset defaultOffset,
public JdbcValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset,
TemporalAdjuster adjuster, BigIntUnsignedMode bigIntUnsignedMode) {
this.defaultOffset = defaultOffset != null ? defaultOffset : ZoneOffset.UTC;
this.adaptiveTimePrecision = adaptiveTimePrecision;
this.adaptiveTimePrecisionMode = temporalPrecisionMode.equals(TemporalPrecisionMode.ADAPTIVE);
this.adaptiveTimeMicrosecondsPrecisionMode = temporalPrecisionMode.equals(TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS);
this.decimalMode = decimalMode != null ? decimalMode : DecimalMode.PRECISE;
this.adjuster = adjuster;
this.bigIntUnsignedMode = bigIntUnsignedMode != null ? bigIntUnsignedMode : BigIntUnsignedMode.PRECISE;
@ -189,20 +189,23 @@ public SchemaBuilder schemaBuilder(Column column) {
return Xml.builder();
// Date and time values
case Types.DATE:
if (adaptiveTimePrecision) {
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
return Date.builder();
}
return org.apache.kafka.connect.data.Date.builder();
case Types.TIME:
if (adaptiveTimePrecision) {
if(adaptiveTimeMicrosecondsPrecisionMode) {
return MicroTime.builder();
}
if (adaptiveTimePrecisionMode) {
if (column.length() <= 3) return Time.builder();
if (column.length() <= 6) return MicroTime.builder();
return NanoTime.builder();
}
return org.apache.kafka.connect.data.Time.builder();
case Types.TIMESTAMP:
if (adaptiveTimePrecision) {
if (column.length() <= 3 || !adaptiveTimePrecision) return Timestamp.builder();
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
if (column.length() <= 3) return Timestamp.builder();
if (column.length() <= 6) return MicroTimestamp.builder();
return NanoTimestamp.builder();
}
@ -296,21 +299,24 @@ public ValueConverter converter(Column column, Field fieldDefn) {
// Date and time values
case Types.DATE:
if (adaptiveTimePrecision) {
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
return (data) -> convertDateToEpochDays(column, fieldDefn, data);
}
return (data) -> convertDateToEpochDaysAsDate(column, fieldDefn, data);
case Types.TIME:
if (adaptiveTimePrecision) {
if (column.length() <= 3) return (data) -> convertTimeToMillisPastMidnight(column, fieldDefn, data);
if (column.length() <= 6) return (data) -> convertTimeToMicrosPastMidnight(column, fieldDefn, data);
if(adaptiveTimeMicrosecondsPrecisionMode) {
return data -> convertTimeToMicrosPastMidnight(column, fieldDefn, data);
}
if (adaptiveTimePrecisionMode) {
if (column.length() <= 3) return data -> convertTimeToMillisPastMidnight(column, fieldDefn, data);
if (column.length() <= 6) return data -> convertTimeToMicrosPastMidnight(column, fieldDefn, data);
return (data) -> convertTimeToNanosPastMidnight(column, fieldDefn, data);
}
return (data) -> convertTimeToMillisPastMidnightAsDate(column, fieldDefn, data);
case Types.TIMESTAMP:
if (adaptiveTimePrecision) {
if (column.length() <= 3) return (data) -> convertTimestampToEpochMillis(column, fieldDefn, data);
if (column.length() <= 6) return (data) -> convertTimestampToEpochMicros(column, fieldDefn, data);
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
if (column.length() <= 3) return data -> convertTimestampToEpochMillis(column, fieldDefn, data);
if (column.length() <= 6) return data -> convertTimestampToEpochMicros(column, fieldDefn, data);
return (data) -> convertTimestampToEpochNanos(column, fieldDefn, data);
}
return (data) -> convertTimestampToEpochMillisAsDate(column, fieldDefn, data);

View File

@ -0,0 +1,71 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.jdbc;
import io.debezium.config.EnumeratedValue;
/**
* The set of predefined TemporalPrecisionMode options.
*/
public enum TemporalPrecisionMode implements EnumeratedValue {
/**
* Represent time and date values based upon the resolution in the database, using {@link io.debezium.time} semantic
* types.
*/
ADAPTIVE("adaptive"),
/**
* Represent timestamp, datetime and date values based upon the resolution in the database, using
* {@link io.debezium.time} semantic types. TIME fields will always be represented as microseconds
* in INT64 / {@link java.lang.Long} using {@link io.debezium.time.MicroTime}
*/
ADAPTIVE_TIME_MICROSECONDS("adaptive_time_microseconds"),
/**
* Represent time and date values using Kafka Connect {@link org.apache.kafka.connect.data} logical types, which always
* have millisecond precision.
*/
CONNECT("connect");
private final String value;
TemporalPrecisionMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static TemporalPrecisionMode parse(String value) {
if (value == null) return null;
value = value.trim();
for (TemporalPrecisionMode option : TemporalPrecisionMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static TemporalPrecisionMode parse(String value, String defaultValue) {
TemporalPrecisionMode mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}

View File

@ -5,6 +5,7 @@
*/
package io.debezium.time;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@ -92,6 +93,14 @@ protected static LocalTime toLocalTime(Object obj) {
date.getSeconds(),
nanosOfSecond);
}
if (obj instanceof Duration) {
Long value = ((Duration) obj).toNanos();
if (value >= 0 && value <= NANOSECONDS_PER_DAY) {
return LocalTime.ofNanoOfDay(value);
} else {
throw new IllegalArgumentException("Time values must use number of milliseconds greater than 0 and less than 86400000000000");
}
}
if ( obj instanceof Long) {
// Assume the value is the epoch day number
return LocalTime.ofNanoOfDay((Long)obj);

View File

@ -282,3 +282,13 @@ CREATE TABLE dbz_147_decimalvalues (
);
INSERT INTO dbz_147_decimalvalues (pk_column, decimal_value)
VALUES(default, 12345.67);
-- DBZ-342 handle TIME values that exceed the value range of java.sql.Time
CREATE TABLE dbz_342_timetest (
c1 TIME(2),
c2 TIME(0),
c3 TIME(3),
c4 TIME(3),
c5 TIME(6)
);
INSERT INTO dbz_342_timetest VALUES ('517:51:04.777', '-13:14:50', '-733:00:00.0011', '-1:59:59.0011', '-838:59:58.999999');

View File

@ -282,3 +282,13 @@ CREATE TABLE dbz_147_decimalvalues (
);
INSERT INTO dbz_147_decimalvalues (pk_column, decimal_value)
VALUES(default, 12345.67);
-- DBZ-342 handle TIME values that exceed the value range of java.sql.Time
CREATE TABLE dbz_342_timetest (
c1 TIME(2),
c2 TIME(0),
c3 TIME(3),
c4 TIME(3),
c5 TIME(6)
);
INSERT INTO dbz_342_timetest VALUES ('517:51:04.777', '-13:14:50', '-733:00:00.0011', '-1:59:59.0011', '-838:59:58.999999');

View File

@ -426,3 +426,13 @@ INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafe' AS JSON), -- BLOB as
INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafebabe' AS JSON), -- BLOB as Base64
'"yv66vg=="',
'"yv66vg=="');
-- DBZ-342 handle TIME values that exceed the value range of java.sql.Time
CREATE TABLE dbz_342_timetest (
c1 TIME(2),
c2 TIME(0),
c3 TIME(3),
c4 TIME(3),
c5 TIME(6)
);
INSERT INTO dbz_342_timetest VALUES ('517:51:04.777', '-13:14:50', '-733:00:00.0011', '-1:59:59.0011', '-838:59:58.999999');

View File

@ -426,3 +426,13 @@ INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafe' AS JSON), -- BLOB as
INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafebabe' AS JSON), -- BLOB as Base64
'"yv66vg=="',
'"yv66vg=="');
-- DBZ-342 handle TIME values that exceed the value range of java.sql.Time
CREATE TABLE dbz_342_timetest (
c1 TIME(2),
c2 TIME(0),
c3 TIME(3),
c4 TIME(3),
c5 TIME(6)
);
INSERT INTO dbz_342_timetest VALUES ('517:51:04.777', '-13:14:50', '-733:00:00.0011', '-1:59:59.0011', '-838:59:58.999999');