DBZ-1206 Adding new connector metric "numberOfFilteredEvents"

This commit is contained in:
Steviep 2019-04-09 21:46:54 +09:00 committed by Gunnar Morling
parent 762d908eef
commit 94f71ea96e
7 changed files with 122 additions and 13 deletions

View File

@ -769,7 +769,8 @@ else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WA
}
}
else {
logger.debug("Skipping {} event: {} for non-monitored table {}", typeToLog, event, tableId);
logger.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);
metrics.onFilteredEvent("source = " + tableId);
}
}

View File

@ -13,17 +13,62 @@
*/
public interface BinlogReaderMetricsMXBean extends StreamingChangeEventSourceMetricsMXBean {
/**
* Name of the current MySQL binlog file being read by underlying mysql-binlog-client.
*/
String getBinlogFilename();
/**
* Current MySQL binlog offset position being read by underlying mysql-binlog-client.
*/
long getBinlogPosition();
/**
* Current MySQL Gtid being read by underlying mysql-binlog-client.
*/
String getGtidSet();
/**
* Tracks the number of seconds since last MySQL binlog event was processed by underlying mysql-binlog-client.
*/
long getSecondsSinceLastEvent();
/**
* Tracks the number of seconds between "now" and when the last processed MySQL binlog event was originally written
* to the MySQL server.
*/
long getSecondsBehindMaster();
/**
* Tracks the number of events skipped by underlying mysql-binlog-client, generally due to the client
* being unable to properly deserialize the event.
*/
long getNumberOfSkippedEvents();
/**
* Tracks the number of times the underlying mysql-binlog-client has been disconnected from MySQL.
*/
long getNumberOfDisconnects();
/**
* Tracks the number of committed transactions.
*/
long getNumberOfCommittedTransactions();
/**
* Tracks the number of rolled back transactions.
*/
long getNumberOfRolledBackTransactions();
/**
* Tracks the number of transactions which are not well-formed.
* Example - The connector sees a commit TX event without a matched begin TX event.
*/
long getNumberOfNotWellFormedTransactions();
/**
* Tracks the number of transaction which contains events that contained more entries than could be contained
* within the connectors {@see io.debezium.connector.mysql.EventBuffer} instance.
*/
long getNumberOfLargeTransactions();
}

View File

@ -106,6 +106,18 @@ protected int consumeAtLeast(int minNumber, long timeout, TimeUnit unit) throws
return counter.get();
}
protected long filterAtLeast(final int minNumber, final long timeout, final TimeUnit unit) throws InterruptedException {
final BinlogReaderMetrics metrics = reader.getMetrics();
final long initialFilterCount = metrics.getNumberOfEventsFiltered();
final long targetNumber = initialFilterCount + minNumber;
long startTime = System.currentTimeMillis();
while (metrics.getNumberOfEventsFiltered() < targetNumber && (System.currentTimeMillis() - startTime) < unit.toMillis(timeout)) {
// Ignore the records polled.
reader.poll();
}
return reader.getMetrics().getNumberOfEventsFiltered() - initialFilterCount;
}
protected Configuration.Builder simpleConfig() {
return DATABASE.defaultConfig()
.with(MySqlConnectorConfig.USER, "replicator")
@ -239,6 +251,46 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
}
/**
* Setup a DATABASE_WHITELIST filter that filters all events.
* Verify all events are properly filtered.
* Verify numberOfFilteredEvents metric is incremented correctly.
*/
@Test
@FixFor( "DBZ-1206" )
public void shouldFilterAllRecordsBasedOnDatabaseWhitelistFilter() throws Exception {
// Define configuration that will ignore all events from MySQL source.
config = simpleConfig()
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "db-does-not-exist")
.build();
final Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("", 0L); // start from beginning
context.initializeHistory();
reader = new BinlogReader("binlog", context, new AcceptAllPredicate());
// Start reading the binlog ...
reader.start();
// Lets wait for at least 35 events to be filtered.
final int expectedFilterCount = 35;
final long numberFiltered = filterAtLeast(expectedFilterCount, 20, TimeUnit.SECONDS);
// All events should have been filtered.
assertThat(numberFiltered).isGreaterThanOrEqualTo(expectedFilterCount);
// There should be no schema changes
assertThat(schemaChanges.recordCount()).isEqualTo(0);
// There should be no records
assertThat(store.collectionCount()).isEqualTo(0);
// There should be no skipped
assertThat(reader.getMetrics().getNumberOfSkippedEvents()).isEqualTo(0);
}
@Test
@FixFor( "DBZ-183" )
public void shouldHandleTimestampTimezones() throws Exception {

View File

@ -111,8 +111,8 @@ public SnapshotReceiver getSnapshotChangeEventReceiver() {
public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) throws InterruptedException {
if(!filter.isIncluded(dataCollectionId)) {
eventListener.onSkippedEvent("source = " + dataCollectionId);
LOGGER.trace("Skipping data change event for {}", dataCollectionId);
LOGGER.trace("Filtered data change event for {}", dataCollectionId);
eventListener.onFilteredEvent("source = " + dataCollectionId);
}
else {
DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
@ -142,7 +142,7 @@ public void changeRecord(DataCollectionSchema schema, Operation operation, Objec
public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
if(!filter.isIncluded(dataCollectionId)) {
LOGGER.trace("Skipping data change event for {}", dataCollectionId);
LOGGER.trace("Filtering schema change event for {}", dataCollectionId);
return;
}

View File

@ -15,9 +15,16 @@ public interface ChangeEventSourceMetricsMXBean {
String getLastEvent();
long getMilliSecondsSinceLastEvent();
long getTotalNumberOfEventsSeen();
long getNumberOfEventsSkipped();
long getNumberOfEventsFiltered();
String[] getMonitoredTables();
int getQueueTotalCapacity();
int getQueueRemainingCapacity();
void reset();
/**
* @deprecated Renamed to getNumberOfEventsFiltered(). To be removed in next major release version.
* See DBZ-1206 and DBZ-1209 for more details.
*/
@Deprecated
long getNumberOfEventsSkipped();
}

View File

@ -34,7 +34,7 @@ public abstract class Metrics implements DataChangeEventListener, ChangeEventSou
protected final EventMetadataProvider metadataProvider;
protected final AtomicLong totalNumberOfEventsSeen = new AtomicLong();
protected final AtomicLong numberOfEventsSkipped = new AtomicLong();
private final AtomicLong numberOfEventsFiltered = new AtomicLong();
protected final AtomicLong lastEventTimestamp = new AtomicLong(-1);
private volatile String lastEvent;
@ -98,8 +98,8 @@ private void updateCommonEventMetrics() {
}
@Override
public void onSkippedEvent(String event) {
numberOfEventsSkipped.incrementAndGet();
public void onFilteredEvent(String event) {
numberOfEventsFiltered.incrementAndGet();
updateCommonEventMetrics();
}
@ -118,16 +118,21 @@ public long getTotalNumberOfEventsSeen() {
return totalNumberOfEventsSeen.get();
}
@Override
public long getNumberOfEventsFiltered() {
return numberOfEventsFiltered.get();
}
@Override
public long getNumberOfEventsSkipped() {
return numberOfEventsSkipped.get();
return getNumberOfEventsFiltered();
}
@Override
public void reset() {
totalNumberOfEventsSeen.set(0);
lastEventTimestamp.set(-1);
numberOfEventsSkipped.set(0);
numberOfEventsFiltered.set(0);
lastEvent = null;
}

View File

@ -27,12 +27,11 @@ public interface DataChangeEventListener {
/**
* Invoked for events pertaining to non-whitelisted tables.
*/
void onSkippedEvent(String event);
void onFilteredEvent(String event);
static DataChangeEventListener NO_OP = new DataChangeEventListener() {
@Override
public void onSkippedEvent(String event) {
public void onFilteredEvent(String event) {
}
@Override