DBZ-777 Further fixes
This commit is contained in:
parent
63250d0c13
commit
5ffe8123f1
@ -70,12 +70,11 @@ public void start(Configuration config) {
|
||||
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
|
||||
final TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(connectorConfig);
|
||||
final Snapshotter snapshotter = connectorConfig.getSnapshotter();
|
||||
|
||||
if (snapshotter == null) {
|
||||
LOGGER.error("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
|
||||
throw new ConnectException("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
|
||||
}
|
||||
|
||||
|
||||
jdbcConnection = new PostgresConnection(connectorConfig.jdbcConfig());
|
||||
final TypeRegistry typeRegistry = jdbcConnection.getTypeRegistry();
|
||||
final Charset databaseCharset = jdbcConnection.getDatabaseCharset();
|
||||
@ -97,7 +96,7 @@ public void start(Configuration config) {
|
||||
slotInfo = connection.getReplicationSlotState(connectorConfig.slotName(), connectorConfig.plugin().getPostgresPluginName());
|
||||
}
|
||||
catch (SQLException e) {
|
||||
LOGGER.warn("unable to load info of replication slot, debezium will try to create the slot");
|
||||
LOGGER.warn("unable to load info of replication slot, Debezium will try to create the slot");
|
||||
}
|
||||
|
||||
if (previousOffset == null) {
|
||||
@ -253,4 +252,4 @@ protected Iterable<Field> getAllConfigurationFields() {
|
||||
public PostgresTaskContext getTaskContext() {
|
||||
return taskContext;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
import io.debezium.connector.postgresql.spi.OffsetState;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
import io.debezium.time.Conversions;
|
||||
import io.debezium.util.Clock;
|
||||
|
||||
@ -173,7 +174,7 @@ public OffsetContext load(Map<String, ?> offset) {
|
||||
final Instant useconds = Conversions.toInstantFromMicros((Long) offset.get(SourceInfo.TIMESTAMP_USEC_KEY));
|
||||
final boolean snapshot = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE);
|
||||
final boolean lastSnapshotRecord = (boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, Boolean.FALSE);
|
||||
return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, txId, useconds, snapshot, lastSnapshotRecord);
|
||||
return new PostgresOffsetContext(connectorConfig, lsn, lastCompletelyProcessedLsn, txId, useconds, snapshot, lastSnapshotRecord);
|
||||
}
|
||||
}
|
||||
|
||||
@ -219,7 +220,7 @@ public void markLastSnapshotRecord() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void event(TableId tableId, Instant instant) {
|
||||
sourceInfo.update(instant, tableId);
|
||||
public void event(DataCollectionId tableId, Instant instant) {
|
||||
sourceInfo.update(instant, (TableId) tableId);
|
||||
}
|
||||
}
|
||||
|
@ -46,8 +46,6 @@ public class PostgresSchema extends RelationalDatabaseSchema {
|
||||
protected final static String PUBLIC_SCHEMA_NAME = "public";
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(PostgresSchema.class);
|
||||
|
||||
private final Filters filters;
|
||||
|
||||
private final TypeRegistry typeRegistry;
|
||||
|
||||
private final Map<TableId, List<String>> tableIdToToastableColumns;
|
||||
@ -64,7 +62,6 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
|
||||
super(config, topicSelector, new Filters(config).tableFilter(),
|
||||
new Filters(config).columnFilter(), getTableSchemaBuilder(config, typeRegistry, databaseCharset), false);
|
||||
|
||||
this.filters = new Filters(config);
|
||||
this.typeRegistry = typeRegistry;
|
||||
this.tableIdToToastableColumns = new HashMap<>();
|
||||
this.relationIdToTableId = new HashMap<>();
|
||||
@ -88,7 +85,7 @@ private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig
|
||||
*/
|
||||
protected PostgresSchema refresh(PostgresConnection connection, boolean printReplicaIdentityInfo) throws SQLException {
|
||||
// read all the information from the DB
|
||||
connection.readSchema(tables(), null, null, filters.tableFilter(), null, true);
|
||||
connection.readSchema(tables(), null, null, getTableFilter(), null, true);
|
||||
if (printReplicaIdentityInfo) {
|
||||
// print out all the replica identity info
|
||||
tableIds().forEach(tableId -> printReplicaIdentityInfo(connection, tableId));
|
||||
@ -151,7 +148,7 @@ protected void refresh(Table table) {
|
||||
}
|
||||
|
||||
protected boolean isFilteredOut(TableId id) {
|
||||
return !filters.tableFilter().isIncluded(id);
|
||||
return !getTableFilter().isIncluded(id);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -38,7 +38,7 @@ public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor
|
||||
try {
|
||||
if (!buffer.hasArray()) {
|
||||
throw new IllegalStateException(
|
||||
"Invalid buffer received from PG server during streaming replication");
|
||||
"Invalid buffer received from Postgres server during streaming replication");
|
||||
}
|
||||
final byte[] source = buffer.array();
|
||||
final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
|
||||
|
@ -11,7 +11,7 @@
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.schema.DataCollectionId;
|
||||
|
||||
/**
|
||||
* Keeps track of the current offset within the source DB's change stream. This reflects in the offset as committed to
|
||||
@ -62,9 +62,7 @@ interface Loader {
|
||||
void postSnapshotCompletion();
|
||||
|
||||
/**
|
||||
* Records the name of the table and the timestamp of the last event
|
||||
* @param tableId
|
||||
* @param timestamp
|
||||
* Records the name of the collection and the timestamp of the last event
|
||||
*/
|
||||
void event(TableId tableId, Instant timestamp);
|
||||
void event(DataCollectionId collectionId, Instant timestamp);
|
||||
}
|
||||
|
@ -11,7 +11,6 @@
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
@ -412,7 +411,7 @@ private Timer getTableScanLogTimer() {
|
||||
* Returns a {@link ChangeRecordEmitter} producing the change records for the given table row.
|
||||
*/
|
||||
protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, TableId tableId, Object[] row) {
|
||||
snapshotContext.offset.event(tableId, Instant.ofEpochMilli(getClock().currentTimeInMillis()));
|
||||
snapshotContext.offset.event(tableId, getClock().currentTime());
|
||||
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, getClock());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user