DBZ-1519 Avoid erroneously logging there are no monitored tables
This commit is contained in:
parent
eb27cc5c54
commit
2b9fbcd5f6
@ -74,6 +74,7 @@ public class MySqlSchema extends RelationalDatabaseSchema {
|
||||
private final HistoryRecordComparator historyComparator;
|
||||
private final boolean skipUnparseableDDL;
|
||||
private final boolean storeOnlyMonitoredTablesDdl;
|
||||
private boolean recoveredTables;
|
||||
|
||||
/**
|
||||
* Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}.
|
||||
@ -131,7 +132,6 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
|
||||
}
|
||||
};
|
||||
this.dbHistory.configure(dbHistoryConfig, historyComparator, new DatabaseHistoryMetrics(configuration), true); // validates
|
||||
|
||||
}
|
||||
|
||||
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {
|
||||
@ -248,6 +248,7 @@ protected void appendCreateTableStatement(StringBuilder sb, Table table) {
|
||||
public void loadHistory(SourceInfo startingPoint) {
|
||||
tables().clear();
|
||||
dbHistory.recover(startingPoint.partition(), startingPoint.offset(), tables(), ddlParser);
|
||||
recoveredTables = !tableIds().isEmpty();
|
||||
refreshSchemas();
|
||||
}
|
||||
|
||||
@ -376,4 +377,9 @@ else if (filters.databaseFilter().test(databaseName) || databaseName == null ||
|
||||
public boolean isStoreOnlyMonitoredTablesDdl() {
|
||||
return storeOnlyMonitoredTablesDdl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableInformationComplete() {
|
||||
return recoveredTables;
|
||||
}
|
||||
}
|
||||
|
@ -285,4 +285,10 @@ public Table tableFor(int relationId) {
|
||||
LOGGER.debug("Relation '{}' resolved to table '{}'", relationId, tableId);
|
||||
return tableFor(tableId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableInformationComplete() {
|
||||
// PostgreSQL does not support HistorizedDatabaseSchema - so no tables are recovered
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -1285,6 +1285,30 @@ public void shouldRewriteIdentityKey() throws InterruptedException {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1519")
|
||||
public void shouldNotIssueWarningForNoMonitoredTablesAfterApplyingFilters() throws Exception {
|
||||
final LogInterceptor logInterceptor = new LogInterceptor();
|
||||
|
||||
TestHelper.execute(SETUP_TABLES_STMT);
|
||||
TestHelper.execute(INSERT_STMT);
|
||||
Configuration config = TestHelper.defaultConfig().with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s2").build();
|
||||
|
||||
// Start connector, verify that it does not log no monitored tables warning
|
||||
start(PostgresConnector.class, config);
|
||||
waitForSnapshotToBeCompleted();
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
assertThat(logInterceptor.containsMessage(NO_MONITORED_TABLES_WARNING)).isFalse();
|
||||
stopConnector();
|
||||
|
||||
// Restart connector, verify it does not log no monitored tables warning
|
||||
start(PostgresConnector.class, config);
|
||||
waitForStreamingRunning();
|
||||
assertThat(logInterceptor.containsMessage(NO_MONITORED_TABLES_WARNING)).isFalse();
|
||||
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) {
|
||||
String insertStmt = "INSERT INTO text_table(j, jb, x, u) " +
|
||||
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " +
|
||||
|
@ -87,9 +87,11 @@ public synchronized <T extends CdcSourceTaskContext> void start(T taskContext, C
|
||||
SnapshotResult snapshotResult = snapshotSource.execute(context);
|
||||
LOGGER.info("Snapshot ended with {}", snapshotResult);
|
||||
|
||||
if (snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED || schema.tableInformationComplete()) {
|
||||
schema.assureNonEmptySchema();
|
||||
}
|
||||
|
||||
if (running && snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED) {
|
||||
if (running && snapshotResult.isCompletedOrSkipped()) {
|
||||
streamingSource = changeEventSourceFactory.getStreamingChangeEventSource(snapshotResult.getOffset());
|
||||
eventDispatcher.setEventListener(streamingMetrics);
|
||||
streamingMetrics.connected(true);
|
||||
|
@ -23,6 +23,14 @@ public static SnapshotResult aborted() {
|
||||
return new SnapshotResult(SnapshotResultStatus.ABORTED, null);
|
||||
}
|
||||
|
||||
public static SnapshotResult skipped(OffsetContext offset) {
|
||||
return new SnapshotResult(SnapshotResultStatus.SKIPPED, offset);
|
||||
}
|
||||
|
||||
public boolean isCompletedOrSkipped() {
|
||||
return this.status == SnapshotResultStatus.SKIPPED || this.status == SnapshotResultStatus.COMPLETED;
|
||||
}
|
||||
|
||||
public SnapshotResultStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
@ -33,7 +41,8 @@ public OffsetContext getOffset() {
|
||||
|
||||
public static enum SnapshotResultStatus {
|
||||
COMPLETED,
|
||||
ABORTED;
|
||||
ABORTED,
|
||||
SKIPPED
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -28,6 +28,7 @@ public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatab
|
||||
implements HistorizedDatabaseSchema<TableId> {
|
||||
|
||||
private final DatabaseHistory databaseHistory;
|
||||
private boolean recoveredTables;
|
||||
|
||||
protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnectorConfig config, TopicSelector<TableId> topicSelector,
|
||||
TableFilter tableFilter, ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder,
|
||||
@ -41,6 +42,7 @@ protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnect
|
||||
@Override
|
||||
public void recover(OffsetContext offset) {
|
||||
databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), getDdlParser());
|
||||
recoveredTables = !tableIds().isEmpty();
|
||||
for (TableId tableId : tableIds()) {
|
||||
buildAndRegisterSchema(tableFor(tableId));
|
||||
}
|
||||
@ -81,4 +83,9 @@ protected void record(SchemaChangeEvent schemaChange, TableChanges tableChanges)
|
||||
databaseHistory.record(schemaChange.getPartition(), schemaChange.getOffset(), schemaChange.getDatabase(),
|
||||
schemaChange.getSchema(), schemaChange.getDdl(), tableChanges);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableInformationComplete() {
|
||||
return recoveredTables;
|
||||
}
|
||||
}
|
||||
|
@ -176,4 +176,9 @@ private TableId toLowerCaseIfNeeded(TableId tableId) {
|
||||
protected TableFilter getTableFilter() {
|
||||
return tableFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableInformationComplete() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ public SnapshotResult execute(ChangeEventSourceContext context) throws Interrupt
|
||||
// Neither schema nor data require snapshotting
|
||||
if (!snapshottingTask.snapshotSchema() && !snapshottingTask.snapshotData()) {
|
||||
LOGGER.debug("Skipping snapshotting");
|
||||
return SnapshotResult.completed(previousOffset);
|
||||
return SnapshotResult.skipped(previousOffset);
|
||||
}
|
||||
|
||||
delaySnapshotIfNeeded(context);
|
||||
|
@ -18,4 +18,12 @@ public interface DatabaseSchema<I extends DataCollectionId> {
|
||||
void close();
|
||||
|
||||
DataCollectionSchema schemaFor(I id);
|
||||
|
||||
/**
|
||||
* Indicates whether or not table names are guaranteed to be fully present, regardless of whether or not a
|
||||
* snapshot has been performed.
|
||||
*
|
||||
* @return boolean indicating if table names are present
|
||||
*/
|
||||
boolean tableInformationComplete();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user