DBZ-812 Column added during streaming

This commit is contained in:
Jiri Pechanec 2018-10-08 14:38:12 +02:00
parent 25bc23a14e
commit ed9706657c
6 changed files with 324 additions and 56 deletions

View File

@ -8,21 +8,28 @@
import io.debezium.relational.TableId;
public class ChangeTable {
private final String captureInstance;
private final TableId tableId;
private final Lsn startLsn;
private final Lsn stopLsn;
private static final String CDC_SCHEMA = "cdc";
public ChangeTable(TableId tableId, String captureInstance, Lsn startLsn, Lsn stopLsn) {
private final String captureInstance;
private final TableId sourceTableId;
private final TableId changeTableId;
private final Lsn startLsn;
private final int changeTableObjectId;
private Lsn stopLsn;
private ChangeTable nextVersionOfTable;
public ChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
super();
this.sourceTableId = sourceTableId;
this.captureInstance = captureInstance;
this.changeTableObjectId = changeTableObjectId;
this.startLsn = startLsn;
this.stopLsn = stopLsn;
this.tableId = tableId;
this.changeTableId = sourceTableId != null ? new TableId(sourceTableId.catalog(), CDC_SCHEMA, captureInstance + "_CT") : null;
}
public ChangeTable(String captureInstance, Lsn startLsn, Lsn stopLsn) {
this(null, captureInstance, startLsn, stopLsn);
public ChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
this(null, captureInstance, changeTableObjectId, startLsn, stopLsn);
}
public String getCaptureInstance() {
@ -35,13 +42,34 @@ public Lsn getStopLsn() {
return stopLsn;
}
public TableId getTableId() {
return tableId;
public void setStopLsn(Lsn stopLsn) {
this.stopLsn = stopLsn;
}
public TableId getSourceTableId() {
return sourceTableId;
}
public TableId getChangeTableId() {
return changeTableId;
}
public int getChangeTableObjectId() {
return changeTableObjectId;
}
public ChangeTable getNextVersionOfTable() {
return nextVersionOfTable;
}
public void setNextVersionOfTable(ChangeTable nextVersionOfTable) {
this.nextVersionOfTable = nextVersionOfTable;
}
@Override
public String toString() {
return "ChangeTable [captureInstance=" + captureInstance + ", tableId=" + tableId + ", startLsn=" + startLsn
+ ", stopLsn=" + stopLsn + "]";
return "ChangeTable [captureInstance=" + captureInstance + ", sourceTableId=" + sourceTableId
+ ", changeTableId=" + changeTableId + ", startLsn=" + startLsn + ", changeTableObjectId="
+ changeTableObjectId + ", stopLsn=" + stopLsn + "]";
}
}

View File

@ -147,4 +147,16 @@ public int compareTo(Lsn o) {
}
return 0;
}
/**
* Verifies whether the LSN falls into a LSN interval
*
* @param from start of the interval (included)
* @param to end of the interval (excluded)
*
* @return true if the LSN falls into the interval
*/
public boolean isBetween(Lsn from, Lsn to) {
return this.compareTo(from) >= 0 && this.compareTo(to) < 0;
}
}

View File

@ -11,6 +11,7 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -49,6 +50,7 @@ public class SqlServerConnection extends JdbcConnection {
+ "EXEC sys.sp_cdc_disable_db";
private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n"
+ "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
private static final String ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE = "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'%s', @capture_instance = N'%s', @role_name = NULL, @supports_net_changes = 0";
private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
private static final String CDC_WRAPPERS_DML;
private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
@ -58,7 +60,9 @@ public class SqlServerConnection extends JdbcConnection {
private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.fn_cdc_get_all_changes_#(ISNULL(?,sys.fn_cdc_get_min_lsn('#')), ?, N'all update old')";
private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture";
private static final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "SELECT * FROM cdc.change_tables WHERE start_lsn BETWEEN ? AND ?";
private static final String CDC_SCHEMA = "cdc";
private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT * FROM cdc.index_columns WHERE object_id=?";
private static final int CHANGE_TABLE_DATA_COLUMN_OFFSET = 5;
private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "};databaseName=${" + JdbcConfiguration.DATABASE + "}";
private static final ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(URL_PATTERN,
@ -130,6 +134,21 @@ public void enableTableCdc(String name) throws SQLException {
execute(enableCdcForTableStmt, generateWrapperFunctionsStmts);
}
/**
* Enables CDC for a table with a custom capture name
* functions for that table.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public void enableTableCdc(String tableName, String captureName) throws SQLException {
Objects.requireNonNull(tableName);
Objects.requireNonNull(captureName);
String enableCdcForTableStmt = String.format(ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE, tableName, captureName);
execute(enableCdcForTableStmt);
}
/**
* Disables CDC for a table for which it was enabled before.
*
@ -300,7 +319,9 @@ public Set<ChangeTable> listOfChangeTables() throws SQLException {
while (rs.next()) {
changeTables.add(
new ChangeTable(
new TableId(database(), rs.getString(1), rs.getString(2)), rs.getString(3),
new TableId(database(), rs.getString(1), rs.getString(2)),
rs.getString(3),
rs.getInt(4),
Lsn.valueOf(rs.getBytes(6)),
Lsn.valueOf(rs.getBytes(7))
)
@ -321,7 +342,12 @@ public Set<ChangeTable> listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws SQL
rs -> {
final Set<ChangeTable> changeTables = new HashSet<>();
while (rs.next()) {
changeTables.add(new ChangeTable(rs.getString(4), Lsn.valueOf(rs.getBytes(5)), Lsn.valueOf(rs.getBytes(6))));
changeTables.add(new ChangeTable(
rs.getString(4),
rs.getInt(1),
Lsn.valueOf(rs.getBytes(5)),
Lsn.valueOf(rs.getBytes(6))
));
}
return changeTables;
}
@ -329,18 +355,55 @@ public Set<ChangeTable> listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws SQL
}
public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLException {
DatabaseMetaData metadata = connection().getMetaData();
final DatabaseMetaData metadata = connection().getMetaData();
List<Column> cols;
try (ResultSet rs = metadata.getColumns(database(), changeTable.getTableId().schema(), changeTable.getTableId().table(), null)) {
cols = rs.next() ? readTableColumns(rs, changeTable.getTableId(), null).stream().map(ColumnEditor::create).collect(Collectors.toList()) : Collections.emptyList();
List<Column> columns = new ArrayList<>();
try (ResultSet rs = metadata.getColumns(
database(),
changeTable.getSourceTableId().schema(),
changeTable.getSourceTableId().table(),
null)
) {
while (rs.next()) {
readTableColumn(rs, changeTable.getSourceTableId(), null).ifPresent(ce -> columns.add(ce.create()));
}
}
List<String> pkColumnNames = readPrimaryKeyNames(metadata, changeTable.getTableId());
Collections.sort(cols);
final List<String> pkColumnNames = readPrimaryKeyNames(metadata, changeTable.getSourceTableId());
Collections.sort(columns);
return Table.editor()
.tableId(changeTable.getTableId())
.addColumns(cols)
.tableId(changeTable.getSourceTableId())
.addColumns(columns)
.setPrimaryKeyNames(pkColumnNames)
.create();
}
public Table getTableSchemaFromChangeTable(ChangeTable changeTable) throws SQLException {
final DatabaseMetaData metadata = connection().getMetaData();
final TableId changeTableId = changeTable.getChangeTableId();
List<ColumnEditor> columnEditors = new ArrayList<>();
try (ResultSet rs = metadata.getColumns(database(), changeTableId.schema(), changeTableId.table(), null)) {
while (rs.next()) {
readTableColumn(rs, changeTableId, null).ifPresent(columnEditors::add);
}
}
// The first 5 columns and the last column of the change table are CDC metadata
final List<Column> columns = columnEditors.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, columnEditors.size() - 1).stream()
.map(c -> c.position(c.position() - CHANGE_TABLE_DATA_COLUMN_OFFSET).create())
.collect(Collectors.toList());
final List<String> pkColumnNames = new ArrayList<>();
prepareQuery(GET_LIST_OF_KEY_COLUMNS, ps -> ps.setInt(1, changeTable.getChangeTableObjectId()), rs -> {
while (rs.next()) {
pkColumnNames.add(rs.getString(2));
}
});
Collections.sort(columns);
return Table.editor()
.tableId(changeTable.getSourceTableId())
.addColumns(columns)
.setPrimaryKeyNames(pkColumnNames)
.create();
}
@ -350,4 +413,8 @@ public synchronized void rollback() throws SQLException {
connection().rollback();
}
}
public String getNameOfChangeTable(String captureName) {
return captureName + "_CT";
}
}

View File

@ -34,7 +34,7 @@ public SqlServerSchemaChangeEventEmitter(SqlServerOffsetContext offsetContext, C
@Override
public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException {
final SchemaChangeEvent event = new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), changeTable.getTableId().catalog(), changeTable.getTableId().schema(), "N/A", tableSchema, SchemaChangeEventType.CREATE, false);
final SchemaChangeEvent event = new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), changeTable.getSourceTableId().catalog(), changeTable.getSourceTableId().schema(), "N/A", tableSchema, SchemaChangeEventType.CREATE, false);
receiver.schemaChangeEvent(event);
}
}

View File

@ -8,16 +8,18 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,6 +70,7 @@ public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorCon
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
final Metronome metronome = Metronome.sleeper(pollInterval, clock);
final Queue<ChangeTable> schemaChangeCheckpoints = new PriorityQueue<>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn()));
try {
final AtomicReference<ChangeTable[]> tablesSlot = new AtomicReference<ChangeTable[]>(getCdcTablesToQuery());
@ -96,8 +99,16 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final Lsn fromLsn = lastProcessedLsn.isAvailable() ? connection.incrementLsn(lastProcessedLsn)
: lastProcessedLsn;
schemaChangeCheckpoints.clear();
if (!connection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) {
tablesSlot.set(getCdcTablesToQuery());
final ChangeTable[] tables = getCdcTablesToQuery();
tablesSlot.set(tables);
for (ChangeTable table: tables) {
if (table.getStopLsn().isBetween(fromLsn, currentMaxLsn)) {
LOGGER.info("Schema will be changed for {}", table);
schemaChangeCheckpoints.add(table);
}
}
}
try {
connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> {
@ -107,14 +118,13 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final ChangeTable[] tables = tablesSlot.get();
for (int i = 0; i < tableCount; i++) {
changeTables[i] = new ChangeTablePointer(tables[i].getTableId(), resultSets[i]);
changeTables[i] = new ChangeTablePointer(tables[i], resultSets[i]);
changeTables[i].next();
}
for (;;) {
ChangeTablePointer tableSmallestLsn = null;
for (int i = 0; i < tableCount; i++) {
final ChangeTablePointer changeTable = changeTables[i];
for (ChangeTablePointer changeTable: changeTables) {
if (changeTable.isCompleted()) {
continue;
}
@ -132,8 +142,22 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
tableSmallestLsn.next();
continue;
}
if (tableSmallestLsn.getChangeTable().getStopLsn().isAvailable() &&
tableSmallestLsn.getChangeTable().getStopLsn().compareTo(tableSmallestLsn.getRowLsn()) <= 0) {
LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableSmallestLsn, tableSmallestLsn.getRowLsn());
tableSmallestLsn.next();
continue;
}
LOGGER.trace("Processing change {}", tableSmallestLsn);
final TableId tableId = tableSmallestLsn.getTableId();
if (!schemaChangeCheckpoints.isEmpty()) {
if (tableSmallestLsn.getRowLsn().compareTo(schemaChangeCheckpoints.peek().getStopLsn()) > 0) {
final ChangeTable oldTable = schemaChangeCheckpoints.poll();
final ChangeTable newTable = oldTable.getNextVersionOfTable();
LOGGER.info("Migrating schema from {} to {}", oldTable, newTable);
dispatcher.dispatchSchemaChangeEvent(oldTable.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(offsetContext, newTable, connection.getTableSchemaFromChangeTable(newTable)));
}
}
final TableId tableId = tableSmallestLsn.getChangeTable().getSourceTableId();
final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
final Lsn rowLsn = tableSmallestLsn.getRowLsn();
final int operation = tableSmallestLsn.getOperation();
@ -178,7 +202,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
}
catch (Exception e) {
throw new ConnectException(e);
errorHandler.setProducerThrowable(e);
}
}
@ -196,24 +220,45 @@ private ChangeTable[] processErrorFromChangeTableQuery(SQLException exception, C
private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedException {
final Set<ChangeTable> cdcEnabledTables = connection.listOfChangeTables();
final Set<ChangeTable> newTables = new HashSet<>();
final Set<ChangeTable> whitelistedCdcEnabledTables = cdcEnabledTables.stream().filter(changeTable -> {
if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(changeTable.getTableId())) {
if (schema.tableFor(changeTable.getTableId()) == null) {
newTables.add(changeTable);
final Map<TableId, List<ChangeTable>> whitelistedCdcEnabledTables = cdcEnabledTables.stream()
.filter(changeTable -> {
if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(changeTable.getSourceTableId())) {
return true;
}
else {
LOGGER.info("CDC is enabled for table {} but the table is not whitelisted by connector");
return false;
}
})
.collect(Collectors.groupingBy(x -> x.getSourceTableId()));
final List<ChangeTable> tables = new ArrayList<>();
for (List<ChangeTable> captures: whitelistedCdcEnabledTables.values()) {
ChangeTable changeTable = captures.get(0);
if (captures.size() > 1) {
if (captures.get(0).getStartLsn().compareTo(captures.get(1).getStartLsn()) < 0) {
captures.get(0).setStopLsn(captures.get(1).getStartLsn());
captures.get(0).setNextVersionOfTable(captures.get(1));
tables.add(captures.get(1));
}
return true;
else {
captures.get(1).setStopLsn(captures.get(0).getStartLsn());
captures.get(1).setNextVersionOfTable(captures.get(0));
changeTable = captures.get(1);
tables.add(captures.get(0));
}
LOGGER.info("Multiple capture instances {} and {} present for the same table", changeTable, captures.get(1));
}
return false;
}).collect(Collectors.toSet());
for (ChangeTable changeTable: newTables) {
LOGGER.info("Table {} is new to be monitored", changeTable);
// We need to read the source table schema - primary key information cannot be obtained from change table
dispatcher.dispatchSchemaChangeEvent(changeTable.getTableId(), new SqlServerSchemaChangeEventEmitter(offsetContext, changeTable, connection.getTableSchemaFromTable(changeTable)));
if (schema.tableFor(changeTable.getSourceTableId()) == null) {
LOGGER.info("Table {} is new to be monitored by capture instance {}", changeTable.getSourceTableId(), changeTable.getCaptureInstance());
// We need to read the source table schema - primary key information cannot be obtained from change table
dispatcher.dispatchSchemaChangeEvent(changeTable.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(offsetContext, changeTable, connection.getTableSchemaFromTable(changeTable)));
}
tables.add(changeTable);
}
final ChangeTable[] tables = whitelistedCdcEnabledTables.toArray(new ChangeTable[whitelistedCdcEnabledTables.size()]);
return tables;
return tables.toArray(new ChangeTable[tables.size()]);
}
@Override
@ -222,18 +267,18 @@ public void commitOffset(Map<String, ?> offset) {
private static class ChangeTablePointer {
private final TableId tableId;
private final ChangeTable changeTable;
private final ResultSet resultSet;
private boolean completed = false;
private Lsn currentChangeLsn;
public ChangeTablePointer(TableId tableId, ResultSet resultSet) {
this.tableId = tableId;
public ChangeTablePointer(ChangeTable changeTable, ResultSet resultSet) {
this.changeTable = changeTable;
this.resultSet = resultSet;
}
public TableId getTableId() {
return tableId;
public ChangeTable getChangeTable() {
return changeTable;
}
public Lsn getCommitLsn() throws SQLException {
@ -261,7 +306,7 @@ public boolean next() throws SQLException {
completed = !resultSet.next();
currentChangeLsn = completed ? Lsn.NULL : Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN));
if (completed) {
LOGGER.trace("Closing result set of change table for table {}", tableId);
LOGGER.trace("Closing result set of change table for table {}", changeTable);
resultSet.close();
}
return !completed;
@ -277,7 +322,7 @@ public int compareTo(ChangeTablePointer o) throws SQLException {
@Override
public String toString() {
return "ChangeTable [tableId=" + tableId + ", resultSet=" + resultSet + ", completed=" + completed + "]";
return "ChangeTable [changeTable=" + changeTable + ", resultSet=" + resultSet + ", completed=" + completed + "]";
}
}
}

View File

@ -8,8 +8,11 @@
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.Assertions;
import org.fest.util.Collections;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -100,6 +103,28 @@ public void addTable() throws Exception {
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablec")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tabled")).hasSize(RECORDS_PER_TABLE);
records.recordsForTopic("server1.dbo.tablec").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tablec.Value")
.field("id", Schema.INT32_SCHEMA)
.field("colc", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
});
records.recordsForTopic("server1.dbo.tabled").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tabled.Value")
.field("id", Schema.INT32_SCHEMA)
.field("cold", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
});
}
@Test
@ -146,7 +171,98 @@ public void removeTable() throws Exception {
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).isNullOrEmpty();
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
@Test
public void addColumnToTable() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START_1 = 10;
final int ID_START_2 = 100;
final int ID_START_3 = 1000;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')"
);
}
SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.INT32_SCHEMA)
.field("colb", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
});
// Enable a second capture instance
connection.execute("ALTER TABLE dbo.tableb ADD newcol INT");
connection.enableTableCdc("tableb", "after_change");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a2')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b2', 2)"
);
}
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
// TODO - Optional flag is lost here as it is not carrie dover to the CDC table
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.OPTIONAL_INT32_SCHEMA)
.field("colb", Schema.OPTIONAL_STRING_SCHEMA)
.field("newcol", Schema.OPTIONAL_INT32_SCHEMA)
.build()
);
});
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_3 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a3')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b3', 3)"
);
}
records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.tableb.Value")
.field("id", Schema.OPTIONAL_INT32_SCHEMA)
.field("colb", Schema.OPTIONAL_STRING_SCHEMA)
.field("newcol", Schema.OPTIONAL_INT32_SCHEMA)
.build()
);
});
}
}
}