DBZ-1988 Misc clean-up;

* Adding integration test
* Actually applying option
This commit is contained in:
Gunnar Morling 2020-05-18 14:31:31 +02:00 committed by Jiri Pechanec
parent 5f7ba1da5d
commit f1cfb3313f
9 changed files with 76 additions and 25 deletions

View File

@ -103,6 +103,7 @@ Jos Huiting
Josh Arenberg
Josh Stanfield
Joy Gao
Juan Antonio Pedraza
Jun Du
Jure Kajzer
Keith Barber

View File

@ -1,9 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.util.Arrays;
import io.debezium.config.EnumeratedValue;
/**
* Strategy for populating the source.ts_ms field in change events.
*/
public enum SourceTimestampMode implements EnumeratedValue {
/**
@ -27,7 +35,7 @@ public String getValue() {
return value;
}
static SourceTimestampMode getDefaultMode() {
public static SourceTimestampMode getDefaultMode() {
return COMMIT;
}
@ -37,5 +45,4 @@ static SourceTimestampMode fromMode(String mode) {
.findFirst()
.orElseGet(SourceTimestampMode::getDefaultMode);
}
}

View File

@ -6,8 +6,6 @@
package io.debezium.connector.sqlserver;
import static io.debezium.connector.sqlserver.SqlServerConnectorConfig.SOURCE_TIMESTAMP_MODE_CONFIG_NAME;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -35,6 +33,7 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.BoundedConcurrentHashMap;
import io.debezium.util.Clock;
/**
* {@link JdbcConnection} extension to be used with Microsoft SQL Server
@ -75,6 +74,7 @@ public class SqlServerConnection extends JdbcConnection {
private final String realDatabaseName;
private final ZoneId transactionTimezone;
private final SourceTimestampMode sourceTimestampMode;
private final Clock clock;
public static interface ResultSetExtractor<T> {
T apply(ResultSet rs) throws SQLException;
@ -85,17 +85,18 @@ public static interface ResultSetExtractor<T> {
/**
* Creates a new connection using the supplied configuration.
*
* @param config
* {@link Configuration} instance, may not be null.
* @param config {@link Configuration} instance, may not be null.
* @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
*/
public SqlServerConnection(Configuration config) {
public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMode sourceTimestampMode) {
super(config, FACTORY);
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();
boolean supportsAtTimeZone = supportsAtTimeZone();
transactionTimezone = retrieveTransactionTimezone(supportsAtTimeZone);
lsnToTimestamp = getLsnToTimestamp(supportsAtTimeZone);
sourceTimestampMode = getSourceTimestampMode(config);
this.clock = clock;
this.sourceTimestampMode = sourceTimestampMode;
}
/**
@ -198,14 +199,14 @@ public Lsn incrementLsn(Lsn lsn) throws SQLException {
* Map a commit LSN to a point in time when the commit happened.
*
* @param lsn - LSN of the commit
* @return time when the commit was recorded into the database log
* @return time when the commit was recorded into the database log or the
* current time, depending on the setting for the "source timestamp
* mode" option
* @throws SQLException
*/
public Instant timestampOfLsn(Lsn lsn) throws SQLException {
if (SourceTimestampMode.PROCESSING.equals(sourceTimestampMode)) {
// Returning null will make the SqlServerSourceInfoStructMaker#struct
// to set the top level field ts_ms in the record.
return null;
return clock.currentTime();
}
if (lsn.getBinary() == null) {
@ -461,10 +462,4 @@ private int getSqlServerVersion() {
throw new RuntimeException("Couldn't obtain database server version", e);
}
}
private SourceTimestampMode getSourceTimestampMode(Configuration config) {
final SourceTimestampMode mode = SourceTimestampMode.fromMode(config.getString(SOURCE_TIMESTAMP_MODE_CONFIG_NAME));
LOGGER.info("Configuring source timestamp with mode={}", mode);
return mode;
}
}

View File

@ -343,6 +343,7 @@ public static ConfigDef configDef() {
private final String databaseName;
private final SnapshotMode snapshotMode;
private final SnapshotIsolationMode snapshotIsolationMode;
private final SourceTimestampMode sourceTimestampMode;
private final ColumnNameFilter columnFilter;
private final boolean readOnlyDatabaseConnection;
@ -361,6 +362,8 @@ public SqlServerConnectorConfig(Configuration config) {
else {
this.snapshotIsolationMode = SnapshotIsolationMode.parse(config.getString(SNAPSHOT_ISOLATION_MODE), SNAPSHOT_ISOLATION_MODE.defaultValueAsString());
}
this.sourceTimestampMode = SourceTimestampMode.fromMode(config.getString(SOURCE_TIMESTAMP_MODE_CONFIG_NAME));
}
private static ColumnNameFilter getColumnNameFilter(String excludedColumnPatterns) {
@ -388,6 +391,10 @@ public SnapshotMode getSnapshotMode() {
return snapshotMode;
}
public SourceTimestampMode getSourceTimestampMode() {
return sourceTimestampMode;
}
public ColumnNameFilter getColumnFilter() {
return columnFilter;
}

View File

@ -57,6 +57,7 @@ public String version() {
@Override
public ChangeEventSourceCoordinator start(Configuration config) {
final Clock clock = Clock.system();
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
@ -70,8 +71,8 @@ public ChangeEventSourceCoordinator start(Configuration config) {
final Configuration jdbcConfig = config.filter(
x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name())))
.subset("database.", true);
dataConnection = new SqlServerConnection(jdbcConfig);
metadataConnection = new SqlServerConnection(jdbcConfig);
dataConnection = new SqlServerConnection(jdbcConfig, clock, connectorConfig.getSourceTimestampMode());
metadataConnection = new SqlServerConnection(jdbcConfig, clock, connectorConfig.getSourceTimestampMode());
try {
dataConnection.setAutoCommit(false);
}
@ -88,8 +89,6 @@ public ChangeEventSourceCoordinator start(Configuration config) {
taskContext = new SqlServerTaskContext(connectorConfig, schema);
final Clock clock = Clock.system();
// Set up the task record queue ...
this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())

View File

@ -239,7 +239,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null;
offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount);
offsetContext.event(tableWithSmallestLsn.getChangeTable().getSourceTableId(),
offsetContext.event(
tableWithSmallestLsn.getChangeTable().getSourceTableId(),
metadataConnection.timestampOfLsn(tableWithSmallestLsn.getChangePosition().getCommitLsn()));
dispatcher

View File

@ -1,22 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.fest.assertions.Assertions.assertThat;
import org.junit.Test;
import io.debezium.doc.FixFor;
public class SourceTimestampModeTest {
@Test
@FixFor("DBZ-1988")
public void shouldConfigureDefaultMode() {
assertThat(SourceTimestampMode.getDefaultMode()).isEqualTo(SourceTimestampMode.COMMIT);
}
@Test
@FixFor("DBZ-1988")
public void shouldReturnOptionFromValidMode() {
assertThat(SourceTimestampMode.fromMode("processing")).isEqualTo(SourceTimestampMode.PROCESSING);
}
@Test
@FixFor("DBZ-1988")
public void shouldReturnDefaultIfGivenModeIsNull() {
assertThat(SourceTimestampMode.fromMode(null)).isEqualTo(SourceTimestampMode.getDefaultMode());
}

View File

@ -1370,6 +1370,36 @@ public void shouldDetectPurgedHistory() throws Exception {
.isTrue();
}
@Test
@FixFor("DBZ-1988")
public void shouldHonorSourceTimestampMode() throws InterruptedException, SQLException {
connection.execute("CREATE TABLE source_timestamp_mode (id int, name varchar(30) primary key(id))");
TestHelper.enableTableCdc(connection, "source_timestamp_mode");
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.source_timestamp_mode")
.with(SqlServerConnectorConfig.SOURCE_TIMESTAMP_MODE, "processing")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
connection.execute("INSERT INTO source_timestamp_mode VALUES(1, 'abc')");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.dbo.source_timestamp_mode");
SourceRecord record = recordsForTopic.get(0);
long eventTs = (long) ((Struct) record.value()).get("ts_ms");
long sourceTs = (long) ((Struct) ((Struct) record.value()).get("source")).get("ts_ms");
// it's not exactly the same as ts_ms, but close enough;
assertThat(eventTs - sourceTs).isLessThan(100);
stopConnector();
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}

View File

@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SourceTimestampMode;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.jdbc.JdbcConfiguration;
@ -185,11 +186,11 @@ private static void dropTestDatabase(SqlServerConnection connection) throws SQLE
}
public static SqlServerConnection adminConnection() {
return new SqlServerConnection(TestHelper.adminJdbcConfig());
return new SqlServerConnection(TestHelper.adminJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode());
}
public static SqlServerConnection testConnection() {
return new SqlServerConnection(TestHelper.defaultJdbcConfig());
return new SqlServerConnection(TestHelper.defaultJdbcConfig(), Clock.system(), SourceTimestampMode.getDefaultMode());
}
/**