DBZ-4321 New PostProcessor contract and Column Reselection

This commit is contained in:
Chris Cranford 2023-12-06 15:08:20 -05:00 committed by Chris Cranford
parent f4dea8044b
commit 59027ed5ed
25 changed files with 1446 additions and 16 deletions

View File

@ -124,6 +124,8 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
throw new DebeziumException(e); throw new DebeziumException(e);
} }
connectorConfig.postProcessorRegistry().injectDependencies(valueConverters, connectionFactory.newConnection(), schema, connectorConfig);
// If the binlog position is not available it is necessary to reexecute snapshot // If the binlog position is not available it is necessary to reexecute snapshot
if (validateSnapshotFeasibility(connectorConfig, previousOffset)) { if (validateSnapshotFeasibility(connectorConfig, previousOffset)) {
previousOffsets.resetOffset(partition); previousOffsets.resetOffset(partition);

View File

@ -16,6 +16,7 @@
import io.debezium.connector.mysql.MySqlFieldReader; import io.debezium.connector.mysql.MySqlFieldReader;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection; import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.connector.mysql.strategy.mariadb.MariaDbGtidSet.MariaDbGtid; import io.debezium.connector.mysql.strategy.mariadb.MariaDbGtidSet.MariaDbGtid;
import io.debezium.relational.TableId;
/** /**
* An {@link AbstractConnectorConnection} for MariaDB. * An {@link AbstractConnectorConnection} for MariaDB.
@ -107,6 +108,11 @@ public boolean isMariaDb() {
return true; return true;
} }
@Override
public String getQualifiedTableName(TableId tableId) {
return tableId.catalog() + "." + tableId.table();
}
@Override @Override
protected GtidSet createGtidSet(String gtids) { protected GtidSet createGtidSet(String gtids) {
return new MariaDbGtidSet(gtids); return new MariaDbGtidSet(gtids);

View File

@ -17,6 +17,7 @@
import io.debezium.connector.mysql.GtidSet; import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.MySqlFieldReader; import io.debezium.connector.mysql.MySqlFieldReader;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection; import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.relational.TableId;
/** /**
* An {@link AbstractConnectorConnection} to be used with MySQL. * An {@link AbstractConnectorConnection} to be used with MySQL.
@ -130,6 +131,11 @@ public boolean isMariaDb() {
return false; return false;
} }
@Override
public String getQualifiedTableName(TableId tableId) {
return tableId.catalog() + "." + tableId.table();
}
@Override @Override
protected GtidSet createGtidSet(String gtids) { protected GtidSet createGtidSet(String gtids) {
return new MySqlGtidSet(gtids); return new MySqlGtidSet(gtids);

View File

@ -0,0 +1,126 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.nio.file.Path;
import org.junit.After;
import org.junit.Before;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.processors.AbstractReselectProcessorTest;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import io.debezium.util.Testing;
/**
* MySQL's integration tests for {@link ReselectColumnsPostProcessor}.
*
* @author Chris Cranford
*/
public class MySqlReselectColumnsProcessorIT extends AbstractReselectProcessorTest<MySqlConnector> {
private static final Path SCHEMA_HISTORY_PATH = Testing.Files
.createTestingPath("file-schema-history-reselect-processor.txt").toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("processor", "empty")
.withDbHistoryPath(SCHEMA_HISTORY_PATH);
private MySqlTestConnection connection;
@Before
public void beforeEach() throws Exception {
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(SCHEMA_HISTORY_PATH);
connection = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());
super.beforeEach();
}
@After
public void afterEach() throws Exception {
super.afterEach();
if (connection != null) {
connection.close();
}
Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
@Override
protected Class<MySqlConnector> getConnectorClass() {
return MySqlConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected Configuration.Builder getConfigurationBuilder() {
return DATABASE.defaultConfig()
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("dbz4321"))
.with(MySqlConnectorConfig.CUSTOM_POST_PROCESSORS, "reselect")
.with("reselect.type", ReselectColumnsPostProcessor.class.getName());
}
@Override
protected String topicName() {
return DATABASE.topicForTable("dbz4321");
}
@Override
protected String tableName() {
return DATABASE.qualifiedTableName("dbz4321");
}
@Override
protected String reselectColumnsList() {
return DATABASE.qualifiedTableName("dbz4321") + ":data";
}
@Override
protected void createTable() throws Exception {
connection.execute("CREATE TABLE dbz4321 (id int primary key, data varchar(50), data2 int);");
}
@Override
protected void dropTable() throws Exception {
}
@Override
protected String getInsertWithValue() {
return "INSERT INTO dbz4321 (id,data,data2) values (1,'one',1);";
}
@Override
protected String getInsertWithNullValue() {
return "INSERT INTO dbz4321 (id,data,data2) values (1,null,1);";
}
@Override
protected void waitForStreamingStarted() throws InterruptedException {
waitForStreamingRunning("mysql", DATABASE.getServerName());
}
protected SourceRecords consumeRecordsByTopicReselectWhenNotNullSnapshot() throws InterruptedException {
return consumeRecordsByTopic(7);
}
@Override
protected SourceRecords consumeRecordsByTopicReselectWhenNotNullStreaming() throws InterruptedException {
return consumeRecordsByTopic(10);
}
protected SourceRecords consumeRecordsByTopicReselectWhenNullSnapshot() throws InterruptedException {
return consumeRecordsByTopic(7);
}
@Override
protected SourceRecords consumeRecordsByTopicReselectWhenNullStreaming() throws InterruptedException {
return consumeRecordsByTopic(8);
}
}

View File

@ -20,6 +20,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.errors.RetriableException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -567,4 +568,46 @@ public NonRelationalTableException(String message) {
super(message); super(message);
} }
} }
@Override
public String buildReselectColumnQuery(TableId tableId, List<String> columns, List<String> keyColumns, Struct source) {
final String commitScn = source.getString(SourceInfo.COMMIT_SCN_KEY);
if (Strings.isNullOrEmpty(commitScn)) {
return super.buildReselectColumnQuery(tableId, columns, keyColumns, source);
}
return String.format("SELECT %s FROM (SELECT * FROM %s AS OF SCN %s) WHERE %s",
columns.stream().map(this::quotedColumnIdString).collect(Collectors.joining(",")),
quotedTableIdString(new TableId(null, tableId.schema(), tableId.table())),
commitScn,
keyColumns.stream().map(key -> key + "=?").collect(Collectors.joining(" AND ")));
}
@Override
public Map<String, Object> reselectColumns(String query, TableId tableId, List<String> columns, List<Object> bindValues) throws SQLException {
return optionallyDoInContainer(() -> super.reselectColumns(query, tableId, columns, bindValues));
}
private <T> T optionallyDoInContainer(ContainerWork<T> work) throws SQLException {
boolean swapped = false;
try {
final String pdbName = config().getString("pdb.name");
if (!Strings.isNullOrEmpty(pdbName)) {
setSessionToPdb(pdbName);
swapped = true;
}
return work.execute();
}
finally {
if (swapped) {
resetSessionToCdb();
}
}
}
@FunctionalInterface
interface ContainerWork<T> {
T execute() throws SQLException;
}
} }

View File

@ -74,6 +74,8 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, defaultValueConverter, schemaNameAdjuster, this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, defaultValueConverter, schemaNameAdjuster,
topicNamingStrategy, tableNameCaseSensitivity); topicNamingStrategy, tableNameCaseSensitivity);
connectorConfig.postProcessorRegistry().injectDependencies(valueConverters, connectionFactory.newConnection(), schema, connectorConfig);
Offsets<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig), Offsets<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig),
connectorConfig.getAdapter().getOffsetContextLoader()); connectorConfig.getAdapter().getOffsetContextLoader());

View File

@ -490,10 +490,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
offsetContext.setTableId(event.getTableId()); offsetContext.setTableId(event.getTableId());
offsetContext.setRedoThread(row.getThread()); offsetContext.setRedoThread(row.getThread());
offsetContext.setRsId(event.getRsId()); offsetContext.setRsId(event.getRsId());
if (eventsProcessed == numEvents) {
// reached the last event update the commit scn in the offsets
offsetContext.getCommitScn().recordCommit(row);
}
final DmlEvent dmlEvent = (DmlEvent) event; final DmlEvent dmlEvent = (DmlEvent) event;
if (!skipExcludedUserName) { if (!skipExcludedUserName) {
@ -530,6 +526,12 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
} }
}; };
// When a COMMIT is received, regardless of the number of events it has, it still
// must be recorded in the commit scn for the node to guarantee updates to the
// offsets. This must be done prior to dispatching the transaction-commit or the
// heartbeat event that follows commit dispatch.
offsetContext.getCommitScn().recordCommit(row);
Instant start = Instant.now(); Instant start = Instant.now();
int dispatchedEventCount = 0; int dispatchedEventCount = 0;
if (numEvents > 0) { if (numEvents > 0) {
@ -546,13 +548,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
} }
} }
} }
else {
// When a COMMIT is received, regardless of the number of events it has, it still
// must be recorded in the commit scn for the node to guarantee updates to the
// offsets. This must be done prior to dispatching the transaction-commit or the
// heartbeat event that follows commit dispatch.
offsetContext.getCommitScn().recordCommit(row);
}
offsetContext.setEventScn(commitScn); offsetContext.setEventScn(commitScn);
offsetContext.setRsId(row.getRsId()); offsetContext.setRsId(row.getRsId());

View File

@ -0,0 +1,207 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Clob;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.processors.AbstractReselectProcessorTest;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
/**
* Oracle's integration tests for {@link ReselectColumnsPostProcessor}.
*
* @author Chris Cranford
*/
public class OracleReselectColumnsProcessorIT extends AbstractReselectProcessorTest<OracleConnector> {
private OracleConnection connection;
@Before
public void beforeEach() throws Exception {
connection = TestHelper.testConnection();
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
super.beforeEach();
}
@After
public void afterEach() throws Exception {
super.afterEach();
if (connection != null) {
connection.close();
}
}
@Override
protected Class<OracleConnector> getConnectorClass() {
return OracleConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected Configuration.Builder getConfigurationBuilder() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4321")
.with(OracleConnectorConfig.CUSTOM_POST_PROCESSORS, "reselect")
.with("reselect.type", ReselectColumnsPostProcessor.class.getName());
}
@Override
protected String topicName() {
return TestHelper.SERVER_NAME + ".DEBEZIUM.DBZ4321";
}
@Override
protected String tableName() {
return "DEBEZIUM.DBZ4321";
}
@Override
protected String reselectColumnsList() {
return "DEBEZIUM.DBZ4321:DATA";
}
@Override
protected void createTable() throws Exception {
TestHelper.dropTable(connection, "dbz4321");
connection.execute("CREATE TABLE dbz4321 (id numeric(9,0) primary key, data varchar2(50), data2 numeric(9,0))");
TestHelper.streamTable(connection, "dbz4321");
}
@Override
protected void dropTable() throws Exception {
TestHelper.dropTable(connection, "dbz4321");
}
@Override
protected String getInsertWithValue() {
return "INSERT INTO dbz4321 (id,data,data2) values (1,'one',1)";
}
@Override
protected String getInsertWithNullValue() {
return "INSERT INTO dbz4321 (id,data,data2) values (1,null,1)";
}
@Override
protected void waitForStreamingStarted() throws InterruptedException {
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
@Override
protected String fieldName(String fieldName) {
return fieldName.toUpperCase();
}
@Test
@FixFor("DBZ-4321")
public void testClobReselectedWhenValueIsUnavailable() throws Exception {
TestHelper.dropTable(connection, "dbz4321");
try {
connection.execute("CREATE TABLE dbz4321 (id numeric(9,0) primary key, data clob, data2 numeric(9,0))");
TestHelper.streamTable(connection, "dbz4321");
Configuration config = getConfigurationBuilder().with(OracleConnectorConfig.LOB_ENABLED, "true").build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingStarted();
// Insert will always include the data
final String clobData = RandomStringUtils.randomAlphabetic(10000);
final Clob clob = connection.connection().createClob();
clob.setString(1, clobData);
connection.prepareQuery("INSERT INTO dbz4321 (id,data,data2) values (1,?,1)", ps -> ps.setClob(1, clob), null);
connection.commit();
// Update row without changing clob
connection.execute("UPDATE dbz4321 set data2=10 where id = 1");
final SourceRecords sourceRecords = consumeRecordsByTopic(2);
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ4321");
assertThat(tableRecords).hasSize(2);
SourceRecord update = tableRecords.get(1);
VerifyRecord.isValidUpdate(update, "ID", 1);
Struct after = ((Struct) update.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo(clobData);
assertThat(after.get("DATA2")).isEqualTo(10);
}
finally {
TestHelper.dropTable(connection, "dbz4321");
}
}
@Test
@FixFor("DBZ-4321")
public void testBlobReselectedWhenValueIsUnavailable() throws Exception {
TestHelper.dropTable(connection, "dbz4321");
try {
connection.execute("CREATE TABLE dbz4321 (id numeric(9,0) primary key, data blob, data2 numeric(9,0))");
TestHelper.streamTable(connection, "dbz4321");
Configuration config = getConfigurationBuilder().with(OracleConnectorConfig.LOB_ENABLED, "true").build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingStarted();
// Insert will always include the data
final byte[] blobData = RandomStringUtils.random(10000).getBytes(StandardCharsets.UTF_8);
final Blob blob = connection.connection().createBlob();
blob.setBytes(1, blobData);
connection.prepareQuery("INSERT INTO dbz4321 (id,data,data2) values (1,?,1)", ps -> ps.setBlob(1, blob), null);
connection.commit();
// Update row without changing clob
connection.execute("UPDATE dbz4321 set data2=10 where id = 1");
final SourceRecords sourceRecords = consumeRecordsByTopic(2);
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ4321");
assertThat(tableRecords).hasSize(2);
SourceRecord update = tableRecords.get(1);
VerifyRecord.isValidUpdate(update, "ID", 1);
Struct after = ((Struct) update.value()).getStruct(Envelope.FieldName.AFTER);
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo(ByteBuffer.wrap(blobData));
assertThat(after.get("DATA2")).isEqualTo(10);
}
finally {
TestHelper.dropTable(connection, "dbz4321");
}
}
}

View File

@ -103,14 +103,17 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final TypeRegistry typeRegistry = jdbcConnection.getTypeRegistry(); final TypeRegistry typeRegistry = jdbcConnection.getTypeRegistry();
final PostgresDefaultValueConverter defaultValueConverter = jdbcConnection.getDefaultValueConverter(); final PostgresDefaultValueConverter defaultValueConverter = jdbcConnection.getDefaultValueConverter();
final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry);
schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverterBuilder.build(typeRegistry)); schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy); this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy);
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets( final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig)); new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system(); final Clock clock = Clock.system();
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset(); final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
connectorConfig.postProcessorRegistry().injectDependencies(valueConverter, connectionFactory.newConnection(), schema, connectorConfig);
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try { try {
// Print out the server information // Print out the server information

View File

@ -797,6 +797,11 @@ public void setQueryColumnValue(PreparedStatement statement, Column column, int
} }
} }
@Override
public TableId createTableId(String databaseName, String schemaName, String tableName) {
return new TableId(null, schemaName, tableName);
}
@FunctionalInterface @FunctionalInterface
public interface PostgresValueConverterBuilder { public interface PostgresValueConverterBuilder {
PostgresValueConverter build(TypeRegistry registry); PostgresValueConverter build(TypeRegistry registry);

View File

@ -0,0 +1,151 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.processors.AbstractReselectProcessorTest;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
/**
* Postgres' integration tests for {@link ReselectColumnsPostProcessor}.
*
* @author Chris Cranford
*/
public class PostgresReselectColumnsProcessorIT extends AbstractReselectProcessorTest<PostgresConnector> {
public static final String CREATE_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" +
"CREATE SCHEMA s1; ";
private PostgresConnection connection;
@Before
public void beforeEach() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.execute(CREATE_STMT);
connection = TestHelper.create();
super.beforeEach();
}
@After
public void afterEach() throws Exception {
super.afterEach();
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
}
@Override
protected Class<PostgresConnector> getConnectorClass() {
return PostgresConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected Configuration.Builder getConfigurationBuilder() {
return TestHelper.defaultConfig()
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz4321")
.with(PostgresConnectorConfig.CUSTOM_POST_PROCESSORS, "reselect")
.with("reselect.type", ReselectColumnsPostProcessor.class.getName());
}
@Override
protected String topicName() {
return "test_server.s1.dbz4321";
}
@Override
protected String tableName() {
return "s1.dbz4321";
}
@Override
protected String reselectColumnsList() {
return "s1.dbz4321:data";
}
@Override
protected void createTable() throws Exception {
TestHelper.execute("CREATE TABLE s1.dbz4321 (id int primary key, data varchar(50), data2 int);");
TestHelper.execute("ALTER TABLE s1.dbz4321 REPLICA IDENTITY FULL;");
}
@Override
protected void dropTable() throws Exception {
}
@Override
protected String getInsertWithValue() {
return "INSERT INTO s1.dbz4321 (id,data,data2) values (1,'one',1);";
}
@Override
protected String getInsertWithNullValue() {
return "INSERT INTO s1.dbz4321 (id,data,data2) values (1,null,1);";
}
@Override
protected void waitForStreamingStarted() throws InterruptedException {
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
}
@Test
@FixFor("DBZ-4321")
public void testToastColumnReselectedWhenValueIsUnavailable() throws Exception {
TestHelper.execute("CREATE TABLE s1.dbz4321_toast (id int primary key, data text, data2 int);");
TestHelper.execute("ALTER TABLE s1.dbz4321_toast REPLICA IDENTITY FULL;");
Configuration config = getConfigurationBuilder()
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz4321_toast")
.build();
start(PostgresConnector.class, TestHelper.defaultConfig().build());
waitForStreamingStarted();
final String text = RandomStringUtils.randomAlphabetic(10000);
TestHelper.execute("INSERT INTO s1.dbz4321_toast (id,data,data2) values (1,'" + text + "',1);",
"UPDATE s1.dbz4321_toast SET data2 = 2 where id = 1;");
final SourceRecords sourceRecords = consumeRecordsByTopic(2);
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic("test_server.s1.dbz4321_toast");
// Check insert
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidInsert(record, "id", 1);
assertThat(after.get("id")).isEqualTo(1);
assertThat(after.get("data")).isEqualTo(text);
assertThat(after.get("data2")).isEqualTo(1);
// Check update
record = tableRecords.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidUpdate(record, "id", 1);
assertThat(after.get("id")).isEqualTo(1);
assertThat(after.get("data")).isEqualTo(text);
assertThat(after.get("data2")).isEqualTo(2);
}
}

View File

@ -92,6 +92,8 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
schema.recover(offsets); schema.recover(offsets);
connectorConfig.postProcessorRegistry().injectDependencies(valueConverters, metadataConnection, schema, connectorConfig);
taskContext = new SqlServerTaskContext(connectorConfig, schema); taskContext = new SqlServerTaskContext(connectorConfig, schema);
// Set up the task record queue ... // Set up the task record queue ...

View File

@ -0,0 +1,112 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import org.junit.After;
import org.junit.Before;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.processors.AbstractReselectProcessorTest;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import io.debezium.util.Testing;
/**
* SQL Server integration tests for {@link ReselectColumnsPostProcessor}.
*
* @author Chris Cranford
*/
public class SqlServerReselectColumnsProcessorIT extends AbstractReselectProcessorTest<SqlServerConnector> {
private SqlServerConnection connection;
@Before
public void beforeEach() throws Exception {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
connection.setAutoCommit(false);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
super.beforeEach();
}
@After
public void afterEach() throws Exception {
super.afterEach();
if (connection != null) {
connection.close();
}
}
@Override
protected Class<SqlServerConnector> getConnectorClass() {
return SqlServerConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected Configuration.Builder getConfigurationBuilder() {
return TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo\\.dbz4321")
.with(SqlServerConnectorConfig.CUSTOM_POST_PROCESSORS, "reselect")
.with("reselect.type", ReselectColumnsPostProcessor.class.getName());
}
@Override
protected String topicName() {
return "server1.testDB1.dbo.dbz4321";
}
@Override
protected String tableName() {
return "dbo.dbz4321";
}
@Override
protected String reselectColumnsList() {
return tableName() + ":data";
}
@Override
protected void createTable() throws Exception {
connection.execute("CREATE TABLE dbz4321 (id int identity(1,1) primary key, data varchar(50), data2 int)");
TestHelper.enableTableCdc(connection, "dbz4321");
}
@Override
protected void dropTable() throws Exception {
}
@Override
protected String getInsertWithValue() {
return "INSERT INTO dbo.dbz4321 (data,data2) values ('one',1)";
}
@Override
protected String getInsertWithNullValue() {
return "INSERT INTO dbo.dbz4321 (data,data2) values (null,1)";
}
@Override
protected void waitForStreamingStarted() throws InterruptedException {
TestHelper.waitForStreamingStarted();
}
@Override
protected SourceRecords consumeRecordsByTopicReselectWhenNullSnapshot() throws InterruptedException {
// The second one is because the change gets captured by the table CDC
// since the table's CDC was enabled before the snapshot.
return consumeRecordsByTopic(3);
}
}

View File

@ -44,6 +44,8 @@
import io.debezium.heartbeat.HeartbeatImpl; import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel; import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.processors.PostProcessorRegistry;
import io.debezium.processors.spi.PostProcessor;
import io.debezium.relational.CustomConverterRegistry; import io.debezium.relational.CustomConverterRegistry;
import io.debezium.schema.SchemaNameAdjuster; import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.schema.SchemaTopicNamingStrategy; import io.debezium.schema.SchemaTopicNamingStrategy;
@ -440,6 +442,7 @@ public static WatermarkStrategy parse(String value, String defaultValue) {
public static final String DATABASE_CONFIG_PREFIX = "database."; public static final String DATABASE_CONFIG_PREFIX = "database.";
public static final String DRIVER_CONFIG_PREFIX = "driver."; public static final String DRIVER_CONFIG_PREFIX = "driver.";
private static final String CONVERTER_TYPE_SUFFIX = ".type"; private static final String CONVERTER_TYPE_SUFFIX = ".type";
private static final String POST_PROCESSOR_TYPE_SUFFIX = ".type";
public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L; public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L;
public static final long DEFAULT_MAX_QUEUE_SIZE_IN_BYTES = 0; // In case we don't want to pass max.queue.size.in.bytes; public static final long DEFAULT_MAX_QUEUE_SIZE_IN_BYTES = 0; // In case we don't want to pass max.queue.size.in.bytes;
public static final String NOTIFICATION_CONFIGURATION_FIELD_PREFIX_STRING = "notification."; public static final String NOTIFICATION_CONFIGURATION_FIELD_PREFIX_STRING = "notification.";
@ -606,6 +609,15 @@ public static WatermarkStrategy parse(String value, String defaultValue) {
.withDescription("Optional list of custom converters that would be used instead of default ones. " .withDescription("Optional list of custom converters that would be used instead of default ones. "
+ "The converters are defined using '<converter.prefix>.type' config option and configured using options '<converter.prefix>.<option>'"); + "The converters are defined using '<converter.prefix>.type' config option and configured using options '<converter.prefix>.<option>'");
public static final Field CUSTOM_POST_PROCESSORS = Field.create("post.processors")
.withDisplayName("List of change event post processors.")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 998))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Optional list of post processors. "
+ "The processors are defined using '<post.processor.prefix>.type' config option and configured using options '<post.processor.prefix.<option>'");
public static final Field SKIPPED_OPERATIONS = Field.create("skipped.operations") public static final Field SKIPPED_OPERATIONS = Field.create("skipped.operations")
.withDisplayName("Skipped Operations") .withDisplayName("Skipped Operations")
.withType(Type.LIST) .withType(Type.LIST)
@ -799,6 +811,7 @@ public static WatermarkStrategy parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY) INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY)
.events( .events(
CUSTOM_CONVERTERS, CUSTOM_CONVERTERS,
CUSTOM_POST_PROCESSORS,
TOMBSTONES_ON_DELETE, TOMBSTONES_ON_DELETE,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX, Heartbeat.HEARTBEAT_TOPICS_PREFIX,
@ -831,6 +844,7 @@ public static WatermarkStrategy parse(String value, String defaultValue) {
private final boolean shouldProvideTransactionMetadata; private final boolean shouldProvideTransactionMetadata;
private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode; private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode;
private final CustomConverterRegistry customConverterRegistry; private final CustomConverterRegistry customConverterRegistry;
private final PostProcessorRegistry postProcessorRegistry;
private final BinaryHandlingMode binaryHandlingMode; private final BinaryHandlingMode binaryHandlingMode;
private final SchemaNameAdjustmentMode schemaNameAdjustmentMode; private final SchemaNameAdjustmentMode schemaNameAdjustmentMode;
private final FieldNameAdjustmentMode fieldNameAdjustmentMode; private final FieldNameAdjustmentMode fieldNameAdjustmentMode;
@ -871,6 +885,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA); this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE)); this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE));
this.customConverterRegistry = new CustomConverterRegistry(getCustomConverters()); this.customConverterRegistry = new CustomConverterRegistry(getCustomConverters());
this.postProcessorRegistry = new PostProcessorRegistry(getPostProcessors());
this.binaryHandlingMode = BinaryHandlingMode.parse(config.getString(BINARY_HANDLING_MODE)); this.binaryHandlingMode = BinaryHandlingMode.parse(config.getString(BINARY_HANDLING_MODE));
this.signalingDataCollection = config.getString(SIGNAL_DATA_COLLECTION); this.signalingDataCollection = config.getString(SIGNAL_DATA_COLLECTION);
this.signalPollInterval = Duration.ofMillis(config.getLong(SIGNAL_POLL_INTERVAL_MS)); this.signalPollInterval = Duration.ofMillis(config.getLong(SIGNAL_POLL_INTERVAL_MS));
@ -1006,6 +1021,10 @@ public CustomConverterRegistry customConverterRegistry() {
return customConverterRegistry; return customConverterRegistry;
} }
public PostProcessorRegistry postProcessorRegistry() {
return postProcessorRegistry;
}
/** /**
* Whether a particular connector supports an optimized way for implementing operation skipping, or not. * Whether a particular connector supports an optimized way for implementing operation skipping, or not.
*/ */
@ -1053,6 +1072,20 @@ private List<CustomConverter<SchemaBuilder, ConvertedField>> getCustomConverters
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@SuppressWarnings("rawtypes")
private List<PostProcessor> getPostProcessors() {
final String postProcessorNameList = config.getString(CUSTOM_POST_PROCESSORS);
final List<String> postProcessorNames = Strings.listOf(postProcessorNameList, x -> x.split(","), String::trim);
return postProcessorNames.stream()
.map(name -> {
PostProcessor processor = config.getInstance(name + POST_PROCESSOR_TYPE_SUFFIX, PostProcessor.class);
processor.configure(config.subset(name, true).asMap());
return processor;
})
.collect(Collectors.toList());
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStructMaker() { public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStructMaker() {
return (SourceInfoStructMaker<T>) sourceInfoStructMaker; return (SourceInfoStructMaker<T>) sourceInfoStructMaker;

View File

@ -44,7 +44,9 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -1587,4 +1589,37 @@ public KeyStore loadKeyStore(String filePath, char[] passwordArray) {
throw new DebeziumException("Error loading keystore", e); throw new DebeziumException("Error loading keystore", e);
} }
} }
public TableId createTableId(String databaseName, String schemaName, String tableName) {
return new TableId(databaseName, schemaName, tableName);
}
public String getQualifiedTableName(TableId tableId) {
return tableId.schema() + "." + tableId.table();
}
public String buildReselectColumnQuery(TableId tableId, List<String> columns, List<String> keyColumns, Struct source) {
return String.format("SELECT %s FROM %s WHERE %s",
columns.stream().map(this::quotedColumnIdString).collect(Collectors.joining(",")),
quotedTableIdString(tableId),
keyColumns.stream().map(key -> key + "=?").collect(Collectors.joining(" AND ")));
}
public Map<String, Object> reselectColumns(String query, TableId tableId, List<String> columns, List<Object> bindValues) throws SQLException {
final Map<String, Object> results = new HashMap<>();
prepareQuery(query, bindValues, (params, rs) -> {
if (!rs.next()) {
LOGGER.warn("No data found for re-selection on table {}.", tableId);
return;
}
for (String columnName : columns) {
results.put(columnName, rs.getObject(columnName));
}
if (rs.next()) {
LOGGER.warn("Re-selection detected multiple rows for the same key in table {}, using first.", tableId);
}
});
return results;
}
} }

View File

@ -335,6 +335,8 @@ public synchronized void stop() throws InterruptedException {
notificationService.stop(); notificationService.stop();
} }
eventDispatcher.close(); eventDispatcher.close();
connectorConfig.postProcessorRegistry().close();
} }
finally { finally {
snapshotMetrics.unregister(); snapshotMetrics.unregister();

View File

@ -40,6 +40,7 @@
import io.debezium.pipeline.spi.Partition; import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter; import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.processors.spi.PostProcessor;
import io.debezium.relational.history.ConnectTableChangeSerializer; import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.HistoryRecord.Fields; import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.schema.DataCollectionFilters.DataCollectionFilter; import io.debezium.schema.DataCollectionFilters.DataCollectionFilter;
@ -473,6 +474,8 @@ public void changeRecord(P partition,
: dataCollectionSchema.keySchema(); : dataCollectionSchema.keySchema();
String topicName = topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id()); String topicName = topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id());
doPostProcessing(key, value);
SourceRecord record = new SourceRecord(partition.getSourcePartition(), SourceRecord record = new SourceRecord(partition.getSourcePartition(),
offsetContext.getOffset(), offsetContext.getOffset(),
topicName, null, topicName, null,
@ -516,9 +519,9 @@ public void changeRecord(P partition,
LOGGER.trace("Received change record for {} operation on key {}", operation, key); LOGGER.trace("Received change record for {} operation on key {}", operation, key);
BufferedDataChangeEvent nextBufferedEvent = new BufferedDataChangeEvent(); doPostProcessing(key, value);
nextBufferedEvent.offsetContext = offsetContext;
nextBufferedEvent.dataChangeEvent = new DataChangeEvent(new SourceRecord( SourceRecord record = new SourceRecord(
partition.getSourcePartition(), partition.getSourcePartition(),
offsetContext.getOffset(), offsetContext.getOffset(),
topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id()), topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id()),
@ -528,7 +531,11 @@ public void changeRecord(P partition,
dataCollectionSchema.getEnvelopeSchema().schema(), dataCollectionSchema.getEnvelopeSchema().schema(),
value, value,
null, null,
headers)); headers);
BufferedDataChangeEvent nextBufferedEvent = new BufferedDataChangeEvent();
nextBufferedEvent.offsetContext = offsetContext;
nextBufferedEvent.dataChangeEvent = new DataChangeEvent(record);
queue.enqueue(bufferedEventRef.getAndSet(nextBufferedEvent).dataChangeEvent); queue.enqueue(bufferedEventRef.getAndSet(nextBufferedEvent).dataChangeEvent);
} }
@ -587,6 +594,8 @@ public void changeRecord(P partition,
Schema keySchema = dataCollectionSchema.keySchema(); Schema keySchema = dataCollectionSchema.keySchema();
String topicName = topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id()); String topicName = topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id());
doPostProcessing(key, value);
SourceRecord record = new SourceRecord( SourceRecord record = new SourceRecord(
partition.getSourcePartition(), partition.getSourcePartition(),
offsetContext.getOffset(), offsetContext.getOffset(),
@ -684,4 +693,11 @@ public void close() {
heartbeat.close(); heartbeat.close();
} }
} }
@SuppressWarnings("resource")
protected void doPostProcessing(Object key, Struct value) {
for (PostProcessor processor : connectorConfig.postProcessorRegistry().getProcessors()) {
processor.apply(key, value);
}
}
} }

View File

@ -0,0 +1,85 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.ThreadSafe;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.processors.spi.ConnectorConfigurationAware;
import io.debezium.processors.spi.JdbcConnectionAware;
import io.debezium.processors.spi.PostProcessor;
import io.debezium.processors.spi.RelationalDatabaseSchemaAware;
import io.debezium.processors.spi.ValueConverterAware;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.ValueConverterProvider;
/**
* Registry of all post processors that are provided by the connector configuration.
*
* @author Chris Cranford
*/
@ThreadSafe
public class PostProcessorRegistry implements Closeable {
private final Logger LOGGER = LoggerFactory.getLogger(PostProcessorRegistry.class);
@Immutable
private final List<PostProcessor> processors;
public PostProcessorRegistry(List<PostProcessor> processors) {
if (processors == null) {
this.processors = Collections.emptyList();
}
else {
this.processors = Collections.unmodifiableList(processors);
}
LOGGER.info("Registered {} post processors.", this.processors.size());
}
@Override
public void close() {
for (PostProcessor processor : processors) {
processor.close();
}
}
public List<PostProcessor> getProcessors() {
return this.processors;
}
/**
* Set the optional dependencies needed by a post processor
*
* @param valueConverterProvider the value converter
* @param connection the JDBC connection
*/
public void injectDependencies(ValueConverterProvider valueConverterProvider, JdbcConnection connection, RelationalDatabaseSchema schema,
RelationalDatabaseConnectorConfig connectorConfig) {
for (PostProcessor processor : processors) {
if (processor instanceof JdbcConnectionAware) {
((JdbcConnectionAware) processor).setDatabaseConnection(connection);
}
if (processor instanceof ValueConverterAware) {
((ValueConverterAware) processor).setValueConverter(valueConverterProvider);
}
if (processor instanceof RelationalDatabaseSchemaAware) {
((RelationalDatabaseSchemaAware) processor).setDatabaseSchema(schema);
}
if (processor instanceof ConnectorConfigurationAware) {
((ConnectorConfigurationAware) processor).setConnectorConfig(connectorConfig);
}
}
}
}

View File

@ -0,0 +1,240 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors.reselect;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.function.Predicates;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.processors.spi.ConnectorConfigurationAware;
import io.debezium.processors.spi.JdbcConnectionAware;
import io.debezium.processors.spi.PostProcessor;
import io.debezium.processors.spi.RelationalDatabaseSchemaAware;
import io.debezium.processors.spi.ValueConverterAware;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.ValueConverter;
import io.debezium.relational.ValueConverterProvider;
import io.debezium.util.Strings;
/**
* An implementation of the Debezium {@link PostProcessor} contract that allows for the re-selection of
* columns that are populated with the unavailable value placeholder or that the user wishes to have
* re-queried with the latest state if the column's value happens to be {@code null}.
*
* This post-processor also implements a variety of injection-aware contracts to have the necessary
* Debezium internal components provided at runtime so that various steps can be taken by this
* post-processor.
*
* @author Chris Cranford
*/
@Incubating
public class ReselectColumnsPostProcessor implements PostProcessor, ConnectorConfigurationAware, JdbcConnectionAware, RelationalDatabaseSchemaAware, ValueConverterAware {
private static final Logger LOGGER = LoggerFactory.getLogger(ReselectColumnsPostProcessor.class);
private static final String RESELECT_COLUMNS_LIST = "columns.list";
private Predicate<String> selector = x -> true;
private JdbcConnection jdbcConnection;
private ValueConverterProvider valueConverterProvider;
private String unavailableValuePlaceholder;
private byte[] unavailableValuePlaceholderBytes;
private RelationalDatabaseSchema schema;
@Override
public void configure(Map<String, ?> properties) {
final Configuration config = Configuration.from(properties);
if (config.hasKey(RESELECT_COLUMNS_LIST)) {
final String reselectColumnNames = config.getString(RESELECT_COLUMNS_LIST);
if (!Strings.isNullOrEmpty(reselectColumnNames)) {
this.selector = Predicates.includes(reselectColumnNames, Pattern.CASE_INSENSITIVE);
}
}
}
@Override
public void close() {
// nothing to do
}
public void apply(Object messageKey, Struct value) {
if (value == null) {
LOGGER.debug("Value is not a Struct, no re-selection possible.");
return;
}
if (!(messageKey instanceof Struct)) {
LOGGER.debug("Key is not a Struct, no re-selection possible.");
return;
}
final Struct key = (Struct) messageKey;
final Struct after = value.getStruct(Envelope.FieldName.AFTER);
if (after == null) {
LOGGER.debug("Value has no after field, no re-selection possible.");
return;
}
final Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source == null) {
LOGGER.debug("Value has no source field, no re-selection possible.");
return;
}
final TableId tableId = getTableIdFromSource(source);
if (tableId == null) {
return;
}
final Table table = schema.tableFor(tableId);
if (table == null) {
LOGGER.debug("Unable to locate table {} in relational model.", tableId);
return;
}
final List<String> requiredColumnSelections = getRequiredColumnSelections(tableId, after);
if (requiredColumnSelections.isEmpty()) {
LOGGER.debug("No columns require re-selection.");
return;
}
final List<String> keyColumns = new ArrayList<>();
final List<Object> keyValues = new ArrayList<>();
for (org.apache.kafka.connect.data.Field field : key.schema().fields()) {
keyColumns.add(field.name());
keyValues.add(key.get(field));
}
Map<String, Object> selections = null;
try {
final String reselectQuery = jdbcConnection.buildReselectColumnQuery(tableId, requiredColumnSelections, keyColumns, source);
selections = jdbcConnection.reselectColumns(reselectQuery, tableId, requiredColumnSelections, keyValues);
if (selections.isEmpty()) {
LOGGER.warn("Failed to find row in table {} with key {}.", tableId, key);
return;
}
}
catch (SQLException e) {
LOGGER.warn("Failed to re-select row for table {} and key {}", tableId, key, e);
return;
}
// Iterate re-selection columns and override old values
for (Map.Entry<String, Object> selection : selections.entrySet()) {
final String columnName = selection.getKey();
final Column column = table.columnWithName(columnName);
final org.apache.kafka.connect.data.Field field = after.schema().field(columnName);
final Object convertedValue = getConvertedValue(column, field, selection.getValue());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Replaced field {} value {} with {}", field.name(), value.get(field), convertedValue);
}
after.put(field.name(), convertedValue);
}
}
@Override
public void setConnectorConfig(CommonConnectorConfig connectorConfig) {
if (connectorConfig instanceof RelationalDatabaseConnectorConfig) {
final RelationalDatabaseConnectorConfig config = (RelationalDatabaseConnectorConfig) connectorConfig;
this.unavailableValuePlaceholder = new String(config.getUnavailableValuePlaceholder());
this.unavailableValuePlaceholderBytes = config.getUnavailableValuePlaceholder();
}
}
@Override
public void setValueConverter(ValueConverterProvider valueConverter) {
this.valueConverterProvider = valueConverter;
}
@Override
public void setDatabaseConnection(JdbcConnection connection) {
this.jdbcConnection = connection;
}
@Override
public void setDatabaseSchema(RelationalDatabaseSchema schema) {
this.schema = schema;
}
private List<String> getRequiredColumnSelections(TableId tableId, Struct after) {
final List<String> columnSelections = new ArrayList<>();
for (org.apache.kafka.connect.data.Field field : after.schema().fields()) {
final Object value = after.get(field);
if (isUnavailableValueHolder(field, value)) {
LOGGER.debug("Adding column {} for table {} to re-select list due to unavailable value placeholder.",
field.name(), tableId);
columnSelections.add(field.name());
}
else {
final String fullyQualifiedName = jdbcConnection.getQualifiedTableName(tableId) + ":" + field.name();
if (value == null && selector.test(fullyQualifiedName)) {
LOGGER.debug("Adding empty column {} for table {} to re-select list.", field.name(), tableId);
columnSelections.add(field.name());
}
}
}
return columnSelections;
}
private boolean isUnavailableValueHolder(org.apache.kafka.connect.data.Field field, Object value) {
if (field.schema().type() == Schema.Type.BYTES && this.unavailableValuePlaceholderBytes != null) {
return ByteBuffer.wrap(unavailableValuePlaceholderBytes).equals(value);
}
return unavailableValuePlaceholder != null && unavailableValuePlaceholder.equals(value);
}
private Object getConvertedValue(Column column, org.apache.kafka.connect.data.Field field, Object value) {
final ValueConverter converter = valueConverterProvider.converter(column, field);
if (converter != null) {
return converter.convert(value);
}
return value;
}
private TableId getTableIdFromSource(Struct source) {
final String databaseName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
if (Strings.isNullOrEmpty(databaseName)) {
LOGGER.debug("Database name is not available, no re-selection possible.");
return null;
}
final String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
if (Strings.isNullOrEmpty(tableName)) {
LOGGER.debug("Table name is not available, no re-selection possible.");
return null;
}
// Schema name can be optional in the case of certain connectors
String schemaName = null;
if (source.schema().field(AbstractSourceInfo.SCHEMA_NAME_KEY) != null) {
schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
}
return jdbcConnection.createTableId(databaseName, schemaName, tableName);
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors.spi;
import io.debezium.config.CommonConnectorConfig;
/**
* Contract that allows injecting a {@link CommonConnectorConfig}.
*
* @author Chris Cranford
*/
public interface ConnectorConfigurationAware {
void setConnectorConfig(CommonConnectorConfig connectorConfig);
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors.spi;
import io.debezium.jdbc.JdbcConnection;
/**
* Contract that allows injecting a {@link JdbcConnection}.
*
* @author Chris Cranford
*/
public interface JdbcConnectionAware {
void setDatabaseConnection(JdbcConnection connection);
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors.spi;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import io.debezium.common.annotation.Incubating;
/**
* A contract that defines a post-processing step that can be applied to any outgoing event
* before it is added to the change event queue.
*
* While this may seem similar to a Kafka Transformation, the difference is that a post processor
* operates on the raw {@link Struct} objects which are still mutable, allowing for a variety of
* use cases that wish to have the full data set of a row for the post-processing step.
*
* In additoin, there are several injection-aware contracts that can be used in conjunction with
* the post processor to automatically have specific Debezium internal objects injected into the
* post processor automatically at connector start-up.
*
* @author Chris Cranford
*
* @see ConnectorConfigurationAware
* @see JdbcConnectionAware
* @see RelationalDatabaseSchemaAware
* @see ValueConverterAware
*/
@Incubating
public interface PostProcessor extends AutoCloseable {
/**
* Configure the post processor.
*
* @param properties map of configurable properties
*/
void configure(Map<String, ?> properties);
/**
* Apply the post processor to the supplied key and value.
*
* @param key the event's key, may be {@code null}
* @param value the event's value, may be {@code null}
*/
void apply(Object key, Struct value);
/**
* Close any resources
*/
@Override
void close();
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors.spi;
import io.debezium.relational.RelationalDatabaseSchema;
/**
* Contract that allows injecting a {@link RelationalDatabaseSchema}.
*
* @author Chris Cranford
*/
public interface RelationalDatabaseSchemaAware {
void setDatabaseSchema(RelationalDatabaseSchema schema);
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors.spi;
import io.debezium.relational.ValueConverterProvider;
/**
* Contract that allows injecting a {@link ValueConverterProvider}.
*
* @author Chris Cranford
*/
public interface ValueConverterAware {
void setValueConverter(ValueConverterProvider valueConverter);
}

View File

@ -0,0 +1,236 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.processors;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import ch.qos.logback.classic.Level;
/**
* @author Chris Cranford
*/
public abstract class AbstractReselectProcessorTest<T extends SourceConnector> extends AbstractConnectorTest {
protected abstract Class<T> getConnectorClass();
protected abstract JdbcConnection databaseConnection();
protected abstract Configuration.Builder getConfigurationBuilder();
protected abstract String topicName();
protected abstract String tableName();
protected abstract String reselectColumnsList();
protected abstract void createTable() throws Exception;
protected abstract void dropTable() throws Exception;
protected abstract String getInsertWithValue();
protected abstract String getInsertWithNullValue();
protected abstract void waitForStreamingStarted() throws InterruptedException;
@Before
@SuppressWarnings("resource")
public void beforeEach() throws Exception {
createTable();
databaseConnection().setAutoCommit(false);
}
@After
public void afterEach() throws Exception {
stopConnector();
assertNoRecordsToConsume();
dropTable();
}
@Test
@FixFor("DBZ-4321")
@SuppressWarnings("resource")
public void testNoColumnsReselectedWhenNotNullSnapshot() throws Exception {
LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
databaseConnection().execute(getInsertWithValue());
Configuration config = getConfigurationBuilder()
.with("snapshot.mode", "initial")
.with("reselect.columns.list", reselectColumnsList()).build();
start(getConnectorClass(), config);
assertConnectorIsRunning();
waitForStreamingStarted();
final SourceRecords sourceRecords = consumeRecordsByTopicReselectWhenNotNullSnapshot();
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(topicName());
// Check read
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidRead(record, fieldName("id"), 1);
assertThat(after.get(fieldName("id"))).isEqualTo(1);
assertThat(after.get(fieldName("data"))).isEqualTo("one");
assertThat(after.get(fieldName("data2"))).isEqualTo(1);
assertThat(interceptor.containsMessage("No columns require re-selection.")).isTrue();
}
@Test
@FixFor("DBZ-4321")
@SuppressWarnings("resource")
public void testNoColumnsReselectedWhenNotNullStreaming() throws Exception {
LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
Configuration config = getConfigurationBuilder().with("reselect.columns.list", reselectColumnsList()).build();
start(getConnectorClass(), config);
assertConnectorIsRunning();
waitForStreamingStarted();
databaseConnection().execute(getInsertWithValue());
databaseConnection().execute(String.format("UPDATE %s SET data = 'two' where id = 1", tableName()));
databaseConnection().execute(String.format("DELETE FROM %s WHERE id = 1", tableName()));
final SourceRecords sourceRecords = consumeRecordsByTopicReselectWhenNotNullStreaming();
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(topicName());
// Check insert
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidInsert(record, fieldName("id"), 1);
assertThat(after.get(fieldName("id"))).isEqualTo(1);
assertThat(after.get(fieldName("data"))).isEqualTo("one");
assertThat(after.get(fieldName("data2"))).isEqualTo(1);
// Check update
record = tableRecords.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidUpdate(record, fieldName("id"), 1);
assertThat(after.get(fieldName("id"))).isEqualTo(1);
assertThat(after.get(fieldName("data"))).isEqualTo("two");
assertThat(after.get(fieldName("data2"))).isEqualTo(1);
// Check delete
record = tableRecords.get(2);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidDelete(record, fieldName("id"), 1);
assertThat(after).isNull();
// Check tombstone
record = tableRecords.get(3);
VerifyRecord.isValidTombstone(record, fieldName("id"), 1);
assertThat(record.value()).isNull();
assertThat(interceptor.containsMessage("No columns require re-selection.")).isTrue();
}
@Test
@FixFor("DBZ-4321")
@SuppressWarnings("resource")
public void testColumnsReselectedWhenValueIsNullSnapshot() throws Exception {
databaseConnection().execute(getInsertWithNullValue());
databaseConnection().execute(String.format("UPDATE %s SET data = 'two' where id = 1", tableName()));
Configuration config = getConfigurationBuilder()
.with("snapshot.mode", "initial")
.with("reselect.columns.list", reselectColumnsList())
.build();
start(getConnectorClass(), config);
assertConnectorIsRunning();
waitForStreamingStarted();
final SourceRecords sourceRecords = consumeRecordsByTopicReselectWhenNullSnapshot();
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(topicName());
// Check insert
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidRead(record, fieldName("id"), 1);
assertThat(after.get(fieldName("id"))).isEqualTo(1);
assertThat(after.get(fieldName("data"))).isEqualTo("two");
assertThat(after.get(fieldName("data2"))).isEqualTo(1);
}
@Test
@FixFor("DBZ-4321")
@SuppressWarnings("resource")
public void testColumnsReselectedWhenValueIsNullStreaming() throws Exception {
Configuration config = getConfigurationBuilder().with("reselect.columns.list", reselectColumnsList()).build();
start(getConnectorClass(), config);
assertConnectorIsRunning();
waitForStreamingStarted();
databaseConnection().executeWithoutCommitting(getInsertWithNullValue());
databaseConnection().executeWithoutCommitting(String.format("UPDATE %s SET data = 'two' where id = 1", tableName()));
databaseConnection().commit();
final SourceRecords sourceRecords = consumeRecordsByTopicReselectWhenNullStreaming();
final List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(topicName());
// Check insert
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidInsert(record, fieldName("id"), 1);
assertThat(after.get(fieldName("id"))).isEqualTo(1);
assertThat(after.get(fieldName("data"))).isEqualTo("two");
assertThat(after.get(fieldName("data2"))).isEqualTo(1);
// Check update
record = tableRecords.get(1);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
VerifyRecord.isValidUpdate(record, fieldName("id"), 1);
assertThat(after.get(fieldName("id"))).isEqualTo(1);
assertThat(after.get(fieldName("data"))).isEqualTo("two");
assertThat(after.get(fieldName("data2"))).isEqualTo(1);
}
protected SourceRecords consumeRecordsByTopicReselectWhenNotNullSnapshot() throws InterruptedException {
return consumeRecordsByTopic(1);
}
protected SourceRecords consumeRecordsByTopicReselectWhenNotNullStreaming() throws InterruptedException {
return consumeRecordsByTopic(4);
}
protected SourceRecords consumeRecordsByTopicReselectWhenNullSnapshot() throws InterruptedException {
return consumeRecordsByTopic(1);
}
protected SourceRecords consumeRecordsByTopicReselectWhenNullStreaming() throws InterruptedException {
return consumeRecordsByTopic(2);
}
protected String fieldName(String fieldName) {
return fieldName;
}
}