DBZ-406 Misc improvements;

* Making isReplayingEventsBeyondBufferCapacity() a "query method" only
* Using constant for buffer default size
* Misc. typo and doc fixes
* Removing unused variable
This commit is contained in:
Gunnar Morling 2017-12-13 10:19:24 +01:00
parent 22dca4e498
commit 06e72308d9
6 changed files with 49 additions and 34 deletions

View File

@ -18,7 +18,7 @@ class BinlogReaderMetrics extends Metrics implements BinlogReaderMetricsMXBean {
private final BinaryLogClient client;
private final BinaryLogClientStatistics stats;
private final AtomicLong numberOfCommitedTransactions = new AtomicLong();
private final AtomicLong numberOfCommittedTransactions = new AtomicLong();
private final AtomicLong numberOfRolledBackTransactions = new AtomicLong();
private final AtomicLong numberOfNotWellFormedTransactions = new AtomicLong();
private final AtomicLong numberOfLargeTransactions = new AtomicLong();
@ -82,15 +82,15 @@ public long getNumberOfDisconnects() {
@Override
public void reset() {
this.stats.reset();
numberOfCommitedTransactions.set(0);
numberOfCommittedTransactions.set(0);
numberOfRolledBackTransactions.set(0);
numberOfNotWellFormedTransactions.set(0);
numberOfLargeTransactions.set(0);
}
@Override
public long getNumberOfCommitedTransactions() {
return numberOfCommitedTransactions.get();
public long getNumberOfCommittedTransactions() {
return numberOfCommittedTransactions.get();
}
@Override
@ -109,7 +109,7 @@ public long getNumberOfLargeTransactions() {
}
public void onCommittedTransaction() {
numberOfCommitedTransactions.incrementAndGet();
numberOfCommittedTransactions.incrementAndGet();
}
public void onRolledBackTransaction() {

View File

@ -24,7 +24,7 @@ public interface BinlogReaderMetricsMXBean {
long getNumberOfDisconnects();
void reset();
long getNumberOfCommitedTransactions();
long getNumberOfCommittedTransactions();
long getNumberOfRolledBackTransactions();
long getNumberOfNotWellFormedTransactions();
long getNumberOfLargeTransactions();

View File

@ -19,25 +19,30 @@
/**
* This class represents a look-ahead buffer that allows Debezium to accumulate binlog events and decide
* if the last event in transaction is either {@code ROLLBACK} or {@code COMMIT}. The incoming evets are either
* if the last event in transaction is either {@code ROLLBACK} or {@code COMMIT}. The incoming events are either
* supposed to be in transaction or out-of-transaction. When out-of-transaction they are sent directly into
* the destination handler. When in transaction the event goes through the buffering.
*
* <p>
* The reason for the buffering is that the binlog contains rolled back transactions in some cases. E.g. that's
* the case when a temporary table is dropped (see DBZ-390). For rolled back transactions we may not propagate
* any of the contained events, hence the buffering is applied.
* <p>
* The transaction start is identified by a {@code BEGIN} event. Transaction is ended either by {@code COMMIT}
* event or by {@code XID} an event.
*
* If there are more events that can fit to the buffer then
* <p>
* If there are more events that can fit to the buffer then:
* <ul>
* <li>Binlog position is recorded for the first event not fitting into the buffer</li>
* <li>Binlog position is recorded for the commit event</li>
* <li>Buffer content is sent to the final handler</li>
* <li>Binlog position is rewound and all events between the above recorded positions are sent to the final handler</li>
* </ul>
*
*
* @author Jiri Pechanec
*
*/
class EventBuffer {
private static final Logger LOGGER = LoggerFactory.getLogger(EventBuffer.class);
private final int capacity;
@ -51,7 +56,7 @@ class EventBuffer {
private BinlogPosition largeTxNotBufferedPosition;
/**
* Contains the position of the last event in belonging to the transaction that has not fit into
* Contains the position of the last event belonging to the transaction that has not fit into
* the buffer.
*/
private BinlogPosition forwardTillPosition;
@ -67,10 +72,18 @@ public EventBuffer(int capacity, BinlogReader reader) {
* @param event to be stored in the buffer
*/
public void add(Event event) {
if (event == null) return;
if (isReplayingEventsNotFittedInBuffer(event)) {
if (event == null) {
return;
}
// we're reprocessing events of the current TX between the position where the
// buffer was full and the end of the TX; in this case there's nothing to do
// besides directly emitting the events
if (isReplayingEventsBeyondBufferCapacity()) {
reader.handleEvent(event);
return;
}
if (event.getHeader().getEventType() == EventType.QUERY) {
QueryEventData command = reader.unwrapData(event);
LOGGER.debug("Received query command: {}", event);
@ -92,17 +105,13 @@ public void add(Event event) {
}
/**
* Process the event if we are replaying TX events from binlog that has not fit into the buffer.
*
* @param event
* @return true if event was handled
* Whether we are replaying TX events from binlog that have not fit into the buffer before
*/
private boolean isReplayingEventsNotFittedInBuffer(Event event) {
private boolean isReplayingEventsBeyondBufferCapacity() {
if (forwardTillPosition != null) {
if (forwardTillPosition.equals(reader.getCurrentBinlogPosition())) {
forwardTillPosition = null;
}
reader.handleEvent(event);
return true;
}
return false;
@ -160,7 +169,7 @@ private void beginTransaction(Event event) {
/**
* Sends all events from the buffer int a final handler. For large transactions it executes rewind
* of binlog reader back to the first event that was not stored in the buffer.
*
*
* @param wellFormed
* @param event
*/

View File

@ -360,7 +360,13 @@ public static EventDeserializationFailureHandlingMode parse(String value) {
private static final String TABLE_WHITELIST_NAME = "table.whitelist";
private static final String TABLE_IGNORE_BUILTIN_NAME = "table.ignore.builtin";
private static final int DEFAULT_BINLOG_BUFFER_SIZE = 10_000;
/**
* Default size of the binlog buffer used for examining transactions and
* deciding whether to propagate them or not. A size of 0 disables the buffer,
* all events will be passed on directly as they are passed by the binlog
* client.
*/
private static final int DEFAULT_BINLOG_BUFFER_SIZE = 0;
public static final Field HOSTNAME = Field.create("database.hostname")
.withDisplayName("Hostname")
@ -651,10 +657,11 @@ public static EventDeserializationFailureHandlingMode parse(String value) {
.withType(Type.INT)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The size of a look-ahead buffer used by the binlog reader to decide whether"
.withDescription("The size of a look-ahead buffer used by the binlog reader to decide whether "
+ "the transaction in progress is going to be committed or rolled back. "
+ "Defaults to " + DEFAULT_BINLOG_BUFFER_SIZE + ". Use 0 to disable look-ahead buffering.")
.withDefault(0)
+ "Use 0 to disable look-ahead buffering. "
+ "Defaults to " + DEFAULT_BINLOG_BUFFER_SIZE + " (i.e. buffering is disabled).")
.withDefault(DEFAULT_BINLOG_BUFFER_SIZE)
.withValidation(Field::isNonNegativeInteger);
/**

View File

@ -10,7 +10,6 @@
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import org.apache.kafka.connect.data.Struct;
@ -163,7 +162,7 @@ public void shouldProcessSavepoint() throws SQLException, InterruptedException {
connection.setAutoCommit(false);
final Statement statement = jdbc.createStatement();
statement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')");
final Savepoint savepoint = jdbc.setSavepoint();
jdbc.setSavepoint();
statement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')");
jdbc.commit();
connection.query("SELECT * FROM customers", rs -> {
@ -239,7 +238,7 @@ public void shouldProcessLargeTransaction() throws SQLException, InterruptedExce
// All records should be present only once
records = consumeRecordsByTopic(numRecords);
int recordIndex = 0;
int recordIndex = 0;
for (SourceRecord r: records.allRecordsInOrder()) {
Struct envelope = (Struct)r.value();
assertThat(envelope.getString("op")).isEqualTo(("c"));

View File

@ -28,7 +28,7 @@
* in <code>src/test/resources/ddl/&lt;database_name&gt;.sql</code>.
* The database name is enriched with a unique suffix that guarantees complete isolation between runs
* <code>&lt;database_name&gt_&lt;suffix&gt</code>
*
*
* @author jpechane
*
*/
@ -44,7 +44,7 @@ public class UniqueDatabase {
private final String templateName;
private final String serverName;
private Path dbHistoryPath;
private String identifier;
private final String identifier;
private UniqueDatabase(final String serverName, final String databaseName, final String identifier) {
this.identifier = identifier;
@ -55,8 +55,8 @@ private UniqueDatabase(final String serverName, final String databaseName, final
/**
* Creates an instance with given Debezium logical name and database name
*
* @param serverName - logical Debezium server name
*
* @param serverName - logical Debezium server name
* @param databaseName - the name of the database (prix)
*/
public UniqueDatabase(final String serverName, final String databaseName) {
@ -68,7 +68,7 @@ public UniqueDatabase(final String serverName, final String databaseName) {
* as another database. This is handy for tests that need multpli databases and can use regex
* based whitelisting.
* @param serverName - logical Debezium server name
* @param serverName - logical Debezium server name
* @param databaseName - the name of the database (prix)
* @param sibling - a database whose unique suffix will be used
*/
@ -168,7 +168,7 @@ public Configuration.Builder defaultConfig() {
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, getDatabaseName())
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10000);
.with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10_000);
if (dbHistoryPath != null) {
builder.with(FileDatabaseHistory.FILE_PATH, dbHistoryPath);
}