DBZ-7845 [CHORE] pr review changes

This commit is contained in:
Gaurav Miglani 2024-05-06 22:47:26 +05:30 committed by Jiri Pechanec
parent 155f41cd77
commit adcf101c24
4 changed files with 28 additions and 17 deletions

View File

@ -14,9 +14,23 @@
*/
public interface Buffer {
/**
* to add a {@link SinkRecordDescriptor} to the internal buffer and
* call the {@link Buffer#flush()} when buffer size >= {@link JdbcSinkConnectorConfig#getBatchSize()}
* @param recordDescriptor the Sink record descriptor
* @return the buffer records
*/
List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor);
/**
* to clear and flush the internal buffer
* @return {@link SinkRecordDescriptor} the flushed buffer records.
*/
List<SinkRecordDescriptor> flush();
/**
* to check whether buffer is empty or not.
* @return true if empty else false.
*/
boolean isEmpty();
}

View File

@ -127,13 +127,7 @@ public void execute(Collection<SinkRecord> records) {
flushBuffer(tableId, updateBufferByTable.get(tableId).flush());
}
Buffer tableIdBuffer;
if (config.isUseReductionBuffer()) {
tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config));
}
else {
tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config));
}
Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId);
List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);
@ -151,13 +145,7 @@ public void execute(Collection<SinkRecord> records) {
Stopwatch updateBufferStopwatch = Stopwatch.reusable();
updateBufferStopwatch.start();
Buffer tableIdBuffer;
if (config.isUseReductionBuffer()) {
tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config));
}
else {
tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config));
}
Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId);
List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);
updateBufferStopwatch.stop();
@ -187,6 +175,15 @@ private static boolean isSchemaChange(SinkRecord record) {
&& record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
}
private Buffer resolveBuffer(Map<TableId, Buffer> bufferMap, TableId tableId) {
if (config.isUseReductionBuffer()) {
return bufferMap.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config));
}
else {
return bufferMap.computeIfAbsent(tableId, k -> new RecordBuffer(config));
}
}
private SinkRecordDescriptor buildRecordSinkDescriptor(SinkRecord record) {
SinkRecordDescriptor sinkRecordDescriptor;

View File

@ -310,13 +310,13 @@ public class JdbcSinkConnectorConfig {
+ "should be excluded from change events. The field names must be delimited by the format <topic>:<field> ");
public static final Field USE_REDUCTION_BUFFER_FIELD = Field.create(USE_REDUCTION_BUFFER)
.withDisplayName("Controls whether to use reduction buffer by the connector to reduce the SQL load when duplicates are found")
.withDisplayName("Specifies whether to use the reduction buffer.")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(false)
.withDescription("Whether to use reduced buffer or not.");
.withDescription("A reduction buffer consolidates the execution of SQL statements by primary key to reduce the SQL load on the target database. When set to false (the default), each incoming event is applied as a logical SQL change. When set to true, incoming events that refer to the same row will be reduced to a single logical change based on the most recent row state.");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(

View File

@ -33,7 +33,7 @@ public String[] getRegistrationKeys() {
@Override
public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
LOGGER.trace("Cannot create enum types automatically, please create the table by hand. Using STRING fallback.");
LOGGER.warn("Cannot create enum types automatically, please create the table by hand. Using STRING fallback.");
return ConnectStringType.INSTANCE.getTypeName(dialect, schema, key);
}