DBZ-777 Misc. adjustments

* Avoiding repeated parsing of table id
* Making 'inconsistentSchemaHandler' field final in EventDispatcher
* Not exposing synchronizeTableSchema() on SPI level
This commit is contained in:
Gunnar Morling 2019-08-15 11:41:01 +02:00
parent 780545d0d2
commit 63250d0c13
7 changed files with 29 additions and 50 deletions

View File

@ -43,6 +43,7 @@ public class PostgresChangeRecordEmitter extends RelationalChangeRecordEmitter {
private final PostgresSchema schema;
private final PostgresConnectorConfig connectorConfig;
private final PostgresConnection connection;
private final TableId tableId;
public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresConnectorConfig connectorConfig, PostgresSchema schema, PostgresConnection connection, ReplicationMessage message) {
super(offset, clock);
@ -51,6 +52,9 @@ public PostgresChangeRecordEmitter(OffsetContext offset, Clock clock, PostgresCo
this.message = message;
this.connectorConfig = connectorConfig;
this.connection = connection;
this.tableId = PostgresSchema.parse(message.getTable());
Objects.requireNonNull(tableId);
}
@Override
@ -68,10 +72,13 @@ protected Operation getOperation() {
}
@Override
protected Object[] getOldColumnValues() {
final TableId tableId = PostgresSchema.parse(message.getTable());
Objects.requireNonNull(tableId);
public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
schema = synchronizeTableSchema(schema);
super.emitChangeRecords(schema, receiver);
}
@Override
protected Object[] getOldColumnValues() {
try {
switch (getOperation()) {
case CREATE:
@ -89,9 +96,6 @@ protected Object[] getOldColumnValues() {
@Override
protected Object[] getNewColumnValues() {
final TableId tableId = PostgresSchema.parse(message.getTable());
Objects.requireNonNull(tableId);
try {
switch (getOperation()) {
case CREATE:
@ -107,8 +111,7 @@ protected Object[] getNewColumnValues() {
}
}
@Override
public DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSchema) {
private DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSchema) {
final boolean metadataInMessage = message.hasTypeMetadata();
final TableId tableId = (TableId) tableSchema.id();
final Table table = schema.tableFor(tableId);

View File

@ -145,8 +145,9 @@ public void start(Configuration config) {
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new);
dispatcher.setInconsistentSchemaHandler(PostgresChangeRecordEmitter::updateSchema);
DataChangeEvent::new,
PostgresChangeRecordEmitter::updateSchema
);
coordinator = new ChangeEventSourceCoordinator(
previousOffset,

View File

@ -6,7 +6,6 @@
package io.debezium.connector.postgresql.snapshot;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -22,11 +21,8 @@
*/
public class ExportedSnapshotter implements Snapshotter {
private Map<TableId, String> snapshotOverrides;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
this.snapshotOverrides = config.getSnapshotSelectOverridesByTable();
}
@Override
@ -46,12 +42,7 @@ public boolean exportSnapshot() {
@Override
public Optional<String> buildSnapshotQuery(TableId tableId) {
if (snapshotOverrides.containsKey(tableId)) {
return Optional.of(snapshotOverrides.get(tableId));
}
else {
return Optional.of("select * from " + tableId.toDoubleQuotedString());
}
return Optional.of("select * from " + tableId.toDoubleQuotedString());
}
@Override

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.postgresql.snapshot;
import java.util.Map;
import java.util.Optional;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
@ -16,26 +15,16 @@
public abstract class QueryingSnapshotter implements Snapshotter {
private Map<TableId, String> snapshotOverrides;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
this.snapshotOverrides = config.getSnapshotSelectOverridesByTable();
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId) {
if (snapshotOverrides.containsKey(tableId)) {
return Optional.of(snapshotOverrides.get(tableId));
}
else {
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
StringBuilder q = new StringBuilder();
q.append("SELECT * FROM ");
q.append(tableId.toDoubleQuotedString());
return Optional.of(q.toString());
}
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
StringBuilder q = new StringBuilder();
q.append("SELECT * FROM ");
q.append(tableId.toDoubleQuotedString());
return Optional.of(q.toString());
}
}

View File

@ -57,7 +57,7 @@ public class EventDispatcher<T extends DataCollectionId> {
private final Heartbeat heartbeat;
private DataChangeEventListener eventListener = DataChangeEventListener.NO_OP;
private final boolean emitTombstonesOnDelete;
private InconsistentSchemaHandler<T> inconsistentSchemaHandler = this::errorOnMissingSchema;
private final InconsistentSchemaHandler<T> inconsistentSchemaHandler;
/**
* Change event receiver for events dispatched from a streaming change event source.
@ -67,6 +67,11 @@ public class EventDispatcher<T extends DataCollectionId> {
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null);
}
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<T> inconsistentSchemaHandler) {
this.topicSelector = topicSelector;
this.schema = schema;
this.historizedSchema = schema instanceof HistorizedDatabaseSchema
@ -77,6 +82,7 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
this.changeEventCreator = changeEventCreator;
this.streamingReceiver = new StreamingChangeRecordReceiver();
this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete();
this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
heartbeat = Heartbeat.create(connectorConfig.getConfig(), topicSelector.getHeartbeatTopic(),
connectorConfig.getLogicalName());
@ -125,13 +131,12 @@ public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter chan
// TODO handle as per inconsistent schema info option
if (dataCollectionSchema == null) {
final Optional<DataCollectionSchema> replacementSchema = inconsistentSchemaHandler.handle(dataCollectionId, changeRecordEmitter);
final Optional<DataCollectionSchema> replacementSchema = inconsistentSchemaHandler.handle(dataCollectionId, changeRecordEmitter);
if (!replacementSchema.isPresent()) {
return;
}
dataCollectionSchema = replacementSchema.get();
}
dataCollectionSchema = changeRecordEmitter.synchronizeTableSchema(dataCollectionSchema);
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
@ -151,10 +156,6 @@ public void changeRecord(DataCollectionSchema schema, Operation operation, Objec
);
}
public void setInconsistentSchemaHandler(InconsistentSchemaHandler<T> inconsistentSchemaHandler) {
this.inconsistentSchemaHandler = inconsistentSchemaHandler;
}
public Optional<DataCollectionSchema> errorOnMissingSchema(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) {
eventListener.onErroneousEvent("source = " + dataCollectionId);
throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);

View File

@ -29,10 +29,6 @@ public interface ChangeRecordEmitter {
*/
OffsetContext getOffset();
default DataCollectionSchema synchronizeTableSchema(DataCollectionSchema dataCollectionSchema) {
return dataCollectionSchema;
}
public interface Receiver {
void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException;
}

View File

@ -51,8 +51,6 @@
*
* @author Gunnar Morling
*/
// TODO Mostly, this should be usable for Postgres as well; only the aspect of managing the schema history will have to
// be made optional based on the connector
public abstract class RelationalSnapshotChangeEventSource implements SnapshotChangeEventSource {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);
@ -228,7 +226,7 @@ private void delaySnapshotIfNeeded(ChangeEventSourceContext context) throws Inte
*/
protected void connectionCreated(SnapshotContext snapshotContext) throws Exception {
}
private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) {
return tableIds
.stream()