DBZ-3161 Refactor/Rename all references of MonitoredTables to CapturedTables

This commit is contained in:
Anisha Mohanty 2021-04-21 17:27:51 +05:30 committed by Jiri Pechanec
parent 062c1c3860
commit f0f8d5d180
25 changed files with 85 additions and 59 deletions

1
.gitignore vendored
View File

@ -28,3 +28,4 @@ generated-sources/
/state/
bin/
gen/

View File

@ -106,7 +106,7 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
*
* @return the array with the table names
*/
public String[] monitoredTablesAsStringArray() {
public String[] capturedTablesAsStringArray() {
final Collection<TableId> tables = tableIds();
String[] ret = new String[tables.size()];
int i = 0;
@ -180,7 +180,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
// - all DDLs if configured
// - or global SET variables
// - or DDLs for monitored objects
if (!databaseHistory.storeOnlyMonitoredTables() || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase())
if (!databaseHistory.storeOnlyCapturedTables() || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase())
|| schemaChange.getTables().stream().map(Table::id).anyMatch(filters.dataCollectionFilter()::isIncluded)) {
LOGGER.debug("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl());
record(schemaChange, schemaChange.getTableChanges());
@ -219,7 +219,7 @@ private List<SchemaChangeEvent> parseDdl(String ddlStatements, String databaseNa
}
// No need to send schema events or store DDL if no table has changed
if (!databaseHistory.storeOnlyMonitoredTables() || isGlobalSetVariableStatement(ddlStatements, databaseName) || ddlChanges.anyMatch(filters)) {
if (!databaseHistory.storeOnlyCapturedTables() || isGlobalSetVariableStatement(ddlStatements, databaseName) || ddlChanges.anyMatch(filters)) {
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
// by database. Unfortunately, the databaseName on the event might not be the same database as that
@ -316,8 +316,8 @@ public boolean historyExists() {
}
@Override
public boolean storeOnlyMonitoredTables() {
return databaseHistory.storeOnlyMonitoredTables();
public boolean storeOnlyCapturedTables() {
return databaseHistory.storeOnlyCapturedTables();
}
/**

View File

@ -236,7 +236,7 @@ protected void releaseDataSnapshotLocks(RelationalSnapshotContext snapshotContex
for (Iterator<SchemaChangeEvent> i = schemaEvents.iterator(); i.hasNext();) {
final SchemaChangeEvent event = i.next();
if (databaseSchema.storeOnlyMonitoredTables() && event.getDatabase() != null && event.getDatabase().length() != 0
if (databaseSchema.storeOnlyCapturedTables() && event.getDatabase() != null && event.getDatabase().length() != 0
&& !connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) {
LOGGER.debug("Skipping schema event as it belongs to a non-captured database: '{}'", event);
continue;
@ -315,7 +315,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Relati
delayedSchemaSnapshotTables = Collect.minus(snapshotContext.capturedSchemaTables, snapshotContext.capturedTables);
LOGGER.info("Tables for delayed schema capture: {}", delayedSchemaSnapshotTables);
}
if (databaseSchema.storeOnlyMonitoredTables()) {
if (databaseSchema.storeOnlyCapturedTables()) {
capturedSchemaTables = snapshotContext.capturedTables;
LOGGER.info("Only monitored tables schema should be captured, capturing: {}", capturedSchemaTables);
}
@ -592,7 +592,7 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
throw new InterruptedException("Interrupted while processing event " + event);
}
if (databaseSchema.storeOnlyMonitoredTables() && event.getDatabase() != null && event.getDatabase().length() != 0
if (databaseSchema.storeOnlyCapturedTables() && event.getDatabase() != null && event.getDatabase().length() != 0
&& !connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) {
LOGGER.debug("Skipping schema event as it belongs to a non-captured database: '{}'", event);
continue;

View File

@ -42,7 +42,12 @@ public void globalLockReleased() {
@Override
public String[] getMonitoredTables() {
return schema.monitoredTablesAsStringArray();
return schema.capturedTablesAsStringArray();
}
@Override
public String[] getCapturedTables() {
return schema.capturedTablesAsStringArray();
}
@Override

View File

@ -154,7 +154,12 @@ public void setMilliSecondsBehindSource(long value) {
@Override
public String[] getMonitoredTables() {
return schema.monitoredTablesAsStringArray();
return schema.capturedTablesAsStringArray();
}
@Override
public String[] getCapturedTables() {
return schema.capturedTablesAsStringArray();
}
@Override

View File

@ -872,7 +872,7 @@ protected void handleUpdateTableMetadata(Event event) {
* {@link MySqlConnectorConfig#INCONSISTENT_SCHEMA_HANDLING_MODE} configuration.
*/
private void informAboutUnknownTableIfRequired(Event event, TableId tableId, String typeToLog) {
if (tableId != null && context.dbSchema().isTableMonitored(tableId)) {
if (tableId != null && context.dbSchema().isTableCaptured(tableId)) {
metrics.onErroneousEvent("source = " + tableId + ", event " + event);
EventHeaderV4 eventHeader = event.getHeader();

View File

@ -152,8 +152,14 @@ public void setMilliSecondsBehindSource(long value) {
}
@Override
@Deprecated
public String[] getMonitoredTables() {
return schema.monitoredTablesAsStringArray();
return schema.capturedTablesAsStringArray();
}
@Override
public String[] getCapturedTables() {
return schema.capturedTablesAsStringArray();
}
@Override

View File

@ -79,7 +79,7 @@ public class MySqlSchema extends RelationalDatabaseSchema {
private final DdlChanges ddlChanges;
private final HistoryRecordComparator historyComparator;
private final boolean skipUnparseableDDL;
private final boolean storeOnlyMonitoredTablesDdl;
private final boolean storeOnlyCapturedTablesDdl;
private boolean recoveredTables;
/**
@ -122,7 +122,7 @@ public MySqlSchema(MySqlConnectorConfig configuration,
.with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, configuration.getLogicalName())
.build();
this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS);
this.storeOnlyMonitoredTablesDdl = dbHistoryConfig.getBoolean(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
this.storeOnlyCapturedTablesDdl = dbHistoryConfig.getBoolean(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL);
this.ddlParser = new MySqlAntlrDdlParser(getValueConverters(configuration), getTableFilter());
this.ddlChanges = this.ddlParser.getDdlChanges();
@ -202,7 +202,7 @@ public Filters filters() {
*
* @return the array with the table names
*/
public String[] monitoredTablesAsStringArray() {
public String[] capturedTablesAsStringArray() {
final Collection<TableId> tables = tableIds();
String[] ret = new String[tables.size()];
int i = 0;
@ -218,7 +218,7 @@ public String[] monitoredTablesAsStringArray() {
* @param id the fully-qualified table identifier; may be null
* @return true if events from the table are captured
*/
public boolean isTableMonitored(TableId id) {
public boolean isTableCaptured(TableId id) {
return filters.tableFilter().test(id);
}
@ -338,7 +338,7 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
}
changes = tables().drainChanges();
// No need to send schema events or store DDL if no table has changed
if (!storeOnlyMonitoredTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) {
if (!storeOnlyCapturedTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) {
if (statementConsumer != null) {
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
@ -377,7 +377,7 @@ else if (filters.databaseFilter().test(databaseName) || databaseName == null ||
// - all DDLs if configured
// - or global SET variables
// - or DDLs for monitored objects
if (!storeOnlyMonitoredTablesDdl || isGlobalSetVariableStatement(ddlStatements, databaseName) || changes.stream().anyMatch(filters().tableFilter()::test)) {
if (!storeOnlyCapturedTablesDdl || isGlobalSetVariableStatement(ddlStatements, databaseName) || changes.stream().anyMatch(filters().tableFilter()::test)) {
dbHistory.record(source.partition(), source.offset(), databaseName, ddlStatements);
}
}
@ -403,10 +403,10 @@ public boolean isGlobalSetVariableStatement(String ddl, String databaseName) {
}
/**
* @return true if only monitored tables should be stored in database history, false if all tables should be stored
* @return true if only captured tables should be stored in database history, false if all tables should be stored
*/
public boolean isStoreOnlyMonitoredTablesDdl() {
return storeOnlyMonitoredTablesDdl;
public boolean isStoreOnlyCapturedTablesDdl() {
return storeOnlyCapturedTablesDdl;
}
@Override

View File

@ -13,5 +13,9 @@
*/
public interface ReaderMetricsMXBean {
@Deprecated
String[] getMonitoredTables();
String[] getCapturedTables();
}

View File

@ -800,7 +800,7 @@ private boolean shouldRecordTableSchema(MySqlSchema schema, Filters filters, Tab
return false;
}
return filters.tableFilter().test(id) || !schema.isStoreOnlyMonitoredTablesDdl();
return filters.tableFilter().test(id) || !schema.isStoreOnlyCapturedTablesDdl();
}
protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection mysql, AtomicReference<String> sql) throws SQLException {

View File

@ -40,7 +40,12 @@ public void globalLockReleased() {
@Override
public String[] getMonitoredTables() {
return schema.monitoredTablesAsStringArray();
return schema.capturedTablesAsStringArray();
}
@Override
public String[] getCapturedTables() {
return schema.capturedTablesAsStringArray();
}
@Override

View File

@ -775,7 +775,7 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, DATABASE.getDatabaseName() + ".products")
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + DATABASE.getDatabaseName() + ".products",
String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName()))
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
@ -823,7 +823,7 @@ public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() thro
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, tables)
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + DATABASE.getDatabaseName() + ".products",
String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName()))
@ -867,7 +867,7 @@ public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throw
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.build();
// Start the connector ...
@ -896,14 +896,14 @@ public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throw
@Test
@FixFor("DBZ-1201")
public void shouldSaveSetCharacterSetWhenStoringOnlyMonitoredTables() throws SQLException, InterruptedException {
public void shouldSaveSetCharacterSetWhenStoringOnlyCapturededTables() throws SQLException, InterruptedException {
Testing.Files.delete(DB_HISTORY_PATH);
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "no_" + DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.build();
// Start the connector ...
@ -1016,7 +1016,7 @@ public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() thro
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.build();
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) {

View File

@ -850,7 +850,7 @@ public void shouldConsumeDatesCorrectlyWhenClientTimezonePrecedesServerTimezoneU
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("dbz_85_fractest"))
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -1011,7 +1011,7 @@ public void shouldConsumeDecimalAsStringFromSnapshot() throws SQLException, Inte
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("dbz_147_decimalvalues"))
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)
.build();
// Start the connector ...

View File

@ -72,7 +72,7 @@ public void testLifecycle() throws Exception {
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
assertConnectorIsRunning();
@ -120,7 +120,7 @@ public void testSnapshotOnlyMetrics() throws Exception {
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
assertSnapshotMetrics();
@ -151,7 +151,7 @@ public void testSnapshotAndStreamingMetrics() throws Exception {
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
assertSnapshotMetrics();
@ -168,7 +168,7 @@ public void testStreamingOnlyMetrics() throws Exception {
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("simple"))
.with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, Boolean.TRUE)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.build());
// CREATE DATABASE, CREATE TABLE, and 2 INSERT

View File

@ -149,7 +149,7 @@ public void shouldWarnOnInvalidMigrateTable() throws SQLException, InterruptedEx
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("monitored"))
.build();

View File

@ -646,7 +646,7 @@ public void dateAndTimeTest() throws InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DATE_TIME_TABLE"))
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.build();
start(MySqlConnector.class, config);
@ -715,7 +715,7 @@ public void timeTypeWithConnectMode() throws Exception {
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DATE_TIME_TABLE"))
.with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.build();
start(MySqlConnector.class, config);
@ -858,7 +858,7 @@ public void alterDateAndTimeTest() throws Exception {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("ALTER_DATE_TIME"))
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.build();
start(MySqlConnector.class, config);

View File

@ -108,7 +108,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Excep
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyMonitoredTables() throws Exception {
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyCapturedTables() throws Exception {
snapshotOfSingleDatabase(false, true, true);
}
@ -123,11 +123,11 @@ public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockNoData() throws
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyMonitoredTablesNoData() throws Exception {
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyCapturedTablesNoData() throws Exception {
snapshotOfSingleDatabase(false, true, false);
}
private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMonitoredTables, boolean data) throws Exception {
private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCapturedTables, boolean data) throws Exception {
final LogInterceptor logInterceptor = new LogInterceptor();
final Builder builder = simpleConfig()
@ -138,7 +138,7 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
.with(MySqlConnectorConfig.PASSWORD, "cloudpass")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.TEST_DISABLE_GLOBAL_LOCKING, "true")
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, storeOnlyMonitoredTables);
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedTables);
}
if (!data) {
builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY);
@ -153,7 +153,7 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
// Testing.Print.enable();
KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(DATABASE.getServerName() + ".");
SchemaChangeHistory schemaChanges = new SchemaChangeHistory(DATABASE.getServerName());
final int schemaEventsCount = !useGlobalLock ? (storeOnlyMonitoredTables ? 8 : 14) : 0;
final int schemaEventsCount = !useGlobalLock ? (storeOnlyCapturedTables ? 8 : 14) : 0;
SourceRecords sourceRecords = consumeRecordsByTopic(schemaEventsCount + (data ? 9 + 4 : 0));
for (Iterator<SourceRecord> i = sourceRecords.allRecordsInOrder().iterator(); i.hasNext();) {
final SourceRecord record = i.next();

View File

@ -525,7 +525,7 @@ public void shouldAcceptTls12() throws Exception {
private void inconsistentSchema(EventProcessingFailureHandlingMode mode) throws InterruptedException, SQLException {
Configuration.Builder builder = simpleConfig()
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true)
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("orders"));
if (mode == null) {

View File

@ -118,18 +118,18 @@ public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Excep
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyMonitoredTables() throws Exception {
public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyCapturedTables() throws Exception {
snapshotOfSingleDatabase(false, true);
}
private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMonitoredTables) throws Exception {
private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCapturedTables) throws Exception {
final Builder builder = simpleConfig();
if (!useGlobalLock) {
builder
.with(MySqlConnectorConfig.USER, "cloud")
.with(MySqlConnectorConfig.PASSWORD, "cloudpass")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, storeOnlyMonitoredTables);
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedTables);
}
config = builder.build();
context = new MySqlTaskContext(config, new Filters.Builder(config).build());

View File

@ -2551,7 +2551,7 @@ public void initializeStorage() {
}
@Override
public boolean storeOnlyMonitoredTables() {
public boolean storeOnlyCapturedTables() {
return false;
}

View File

@ -276,7 +276,7 @@ public Optional<DataCollectionSchema> ignoreMissingSchema(T dataCollectionId, Ch
public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
if (dataCollectionId != null && !filter.isIncluded(dataCollectionId)) {
if (historizedSchema == null || historizedSchema.storeOnlyMonitoredTables()) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
LOGGER.trace("Filtering schema change event for {}", dataCollectionId);
return;
}
@ -298,7 +298,7 @@ public void dispatchSchemaChangeEvent(Collection<T> dataCollectionIds, SchemaCha
}
}
if (!anyNonfilteredEvent) {
if (historizedSchema == null || historizedSchema.storeOnlyMonitoredTables()) {
if (historizedSchema == null || historizedSchema.storeOnlyCapturedTables()) {
LOGGER.trace("Filtering schema change event for {}", dataCollectionIds);
return;
}

View File

@ -94,7 +94,7 @@ public boolean tableInformationComplete() {
return recoveredTables;
}
public boolean storeOnlyMonitoredTables() {
return databaseHistory.storeOnlyMonitoredTables();
public boolean storeOnlyCapturedTables() {
return databaseHistory.storeOnlyCapturedTables();
}
}

View File

@ -49,7 +49,7 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory {
protected Configuration config;
private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE;
private boolean skipUnparseableDDL;
private boolean storeOnlyMonitoredTablesDdl;
private boolean storeOnlyCapturedTablesDdl;
private Function<String, Optional<Pattern>> ddlFilter = (x -> Optional.empty());
private DatabaseHistoryListener listener = DatabaseHistoryListener.NOOP;
private boolean useCatalogBeforeSchema;
@ -64,7 +64,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
this.config = config;
this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE;
this.skipUnparseableDDL = config.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS);
this.storeOnlyMonitoredTablesDdl = config.getBoolean(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
this.storeOnlyCapturedTablesDdl = config.getBoolean(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL);
final String ddlFilter = config.getString(DatabaseHistory.DDL_FILTER);
this.ddlFilter = (ddlFilter != null) ? Predicates.matchedBy(ddlFilter) : this.ddlFilter;
@ -165,8 +165,8 @@ public void initializeStorage() {
}
@Override
public boolean storeOnlyMonitoredTables() {
return storeOnlyMonitoredTablesDdl;
public boolean storeOnlyCapturedTables() {
return storeOnlyCapturedTablesDdl;
}
@Override

View File

@ -158,7 +158,7 @@ public interface DatabaseHistory {
*/
void initializeStorage();
boolean storeOnlyMonitoredTables();
boolean storeOnlyCapturedTables();
boolean skipUnparseableDdlStatements();

View File

@ -36,7 +36,7 @@ public static interface SchemaChangeEventConsumer {
void initializeStorage();
default boolean storeOnlyMonitoredTables() {
default boolean storeOnlyCapturedTables() {
return false;
}
}