DBZ-1381 Introduce permanent metadata connection
This commit is contained in:
parent
d9e5aa0955
commit
c07c994434
@ -64,13 +64,11 @@ protected PostgresConnectorConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
protected void refreshSchema(boolean printReplicaIdentityInfo) throws SQLException {
|
||||
try (final PostgresConnection connection = createConnection()) {
|
||||
schema.refresh(connection, printReplicaIdentityInfo);
|
||||
}
|
||||
protected void refreshSchema(PostgresConnection connection, boolean printReplicaIdentityInfo) throws SQLException {
|
||||
schema.refresh(connection, printReplicaIdentityInfo);
|
||||
}
|
||||
|
||||
Long getSlotXmin() throws SQLException {
|
||||
Long getSlotXmin(PostgresConnection connection) throws SQLException {
|
||||
// when xmin fetch is set to 0, we don't track it to ignore any performance of querying the
|
||||
// slot periodically
|
||||
if (config.xminFetchInterval().toMillis() <= 0) {
|
||||
@ -79,7 +77,7 @@ Long getSlotXmin() throws SQLException {
|
||||
assert(this.refreshXmin != null);
|
||||
|
||||
if (this.refreshXmin.hasElapsed()) {
|
||||
lastXmin = getCurrentSlotState().slotCatalogXmin();
|
||||
lastXmin = getCurrentSlotState(connection).slotCatalogXmin();
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("Fetched new xmin from slot of {}", lastXmin);
|
||||
}
|
||||
@ -93,10 +91,8 @@ Long getSlotXmin() throws SQLException {
|
||||
return lastXmin;
|
||||
}
|
||||
|
||||
private SlotState getCurrentSlotState() throws SQLException {
|
||||
try (final PostgresConnection connection = createConnection()) {
|
||||
return connection.getReplicationSlotState(config.slotName(), config.plugin().getPostgresPluginName());
|
||||
}
|
||||
private SlotState getCurrentSlotState(PostgresConnection connection) throws SQLException {
|
||||
return connection.getReplicationSlotState(config.slotName(), config.plugin().getPostgresPluginName());
|
||||
}
|
||||
|
||||
protected ReplicationConnection createReplicationConnection(boolean exportSnapshot) throws SQLException {
|
||||
|
@ -69,7 +69,7 @@ public class RecordsStreamProducer extends RecordsProducer {
|
||||
private ReplicationConnection replicationConnection;
|
||||
private final AtomicReference<ReplicationStream> replicationStream;
|
||||
private final AtomicBoolean cleanupExecuted = new AtomicBoolean();
|
||||
private PgConnection typeResolverConnection = null;
|
||||
private final PostgresConnection metadataConnection;
|
||||
private Long lastCompletelyProcessedLsn;
|
||||
|
||||
/**
|
||||
@ -108,6 +108,7 @@ public RecordsStreamProducer(PostgresTaskContext taskContext,
|
||||
heartbeat = Heartbeat.create(taskContext.config().getConfig(), taskContext.topicSelector().getHeartbeatTopic(),
|
||||
taskContext.config().getLogicalName());
|
||||
pauseNoMessage = Metronome.sleeper(taskContext.getConfig().getPollInterval(), Clock.SYSTEM);
|
||||
metadataConnection = taskContext.createConnection();
|
||||
}
|
||||
|
||||
// this maybe should only be used for testing?
|
||||
@ -146,7 +147,7 @@ protected synchronized void start(BlockingConsumer<ChangeEvent> eventConsumer, C
|
||||
// so we need to start a background thread that just responds to keep alive
|
||||
replicationStream.get().startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, taskContext.config().getLogicalName(), CONTEXT_NAME + "-keep-alive"));
|
||||
// refresh the schema so we have a latest view of the DB tables
|
||||
taskContext.refreshSchema(true);
|
||||
taskContext.refreshSchema(metadataConnection, true);
|
||||
taskContext.schema().assureNonEmptySchema();
|
||||
|
||||
this.lastCompletelyProcessedLsn = sourceInfo.lsn();
|
||||
@ -273,8 +274,8 @@ private void closeConnections() {
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
if (typeResolverConnection != null) {
|
||||
typeResolverConnection.close();
|
||||
if (metadataConnection != null) {
|
||||
metadataConnection.close();
|
||||
}
|
||||
}
|
||||
catch(Exception e) {
|
||||
@ -310,7 +311,7 @@ private void process(ReplicationMessage message, Long lsn, BlockingConsumer<Chan
|
||||
// update the source info with the coordinates for this message
|
||||
Instant commitTime = message.getCommitTime();
|
||||
long txId = message.getTransactionId();
|
||||
sourceInfo.update(lsn, commitTime, txId, tableId, taskContext.getSlotXmin());
|
||||
sourceInfo.update(lsn, commitTime, txId, tableId, taskContext.getSlotXmin(metadataConnection));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("received new message at position {}\n{}", ReplicationConnection.format(lsn), message);
|
||||
}
|
||||
@ -496,15 +497,13 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
|
||||
|
||||
// check if we need to refresh our local schema due to DB schema changes for this table
|
||||
if (refreshSchemaIfChanged && schemaChanged(columns, table, metadataInMessage)) {
|
||||
try (final PostgresConnection connection = taskContext.createConnection()) {
|
||||
// Refresh the schema so we get information about primary keys
|
||||
schema().refresh(connection, tableId, taskContext.config().skipRefreshSchemaOnMissingToastableData());
|
||||
// Update the schema with metadata coming from decoder message
|
||||
if (metadataInMessage) {
|
||||
schema().refresh(tableFromFromMessage(columns, schema().tableFor(tableId)));
|
||||
}
|
||||
table = schema().tableFor(tableId);
|
||||
// Refresh the schema so we get information about primary keys
|
||||
schema().refresh(metadataConnection, tableId, taskContext.config().skipRefreshSchemaOnMissingToastableData());
|
||||
// Update the schema with metadata coming from decoder message
|
||||
if (metadataInMessage) {
|
||||
schema().refresh(tableFromFromMessage(columns, schema().tableFor(tableId)));
|
||||
}
|
||||
table = schema().tableFor(tableId);
|
||||
}
|
||||
|
||||
// based on the schema columns, create the values on the same position as the columns
|
||||
@ -643,9 +642,7 @@ private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
|
||||
}
|
||||
// we don't have a schema registered for this table, even though the filters would allow it...
|
||||
// which means that is a newly created table; so refresh our schema to get the definition for this table
|
||||
try (final PostgresConnection connection = taskContext.createConnection()) {
|
||||
schema.refresh(connection, tableId, taskContext.config().skipRefreshSchemaOnMissingToastableData());
|
||||
}
|
||||
schema.refresh(metadataConnection, tableId, taskContext.config().skipRefreshSchemaOnMissingToastableData());
|
||||
tableSchema = schema.schemaFor(tableId);
|
||||
if (tableSchema == null) {
|
||||
logger.warn("cannot load schema for table '{}'", tableId);
|
||||
@ -657,10 +654,7 @@ private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
|
||||
}
|
||||
|
||||
private synchronized PgConnection typeResolverConnection() throws SQLException {
|
||||
if (typeResolverConnection == null) {
|
||||
typeResolverConnection = (PgConnection) taskContext.createConnection().connection();
|
||||
}
|
||||
return typeResolverConnection;
|
||||
return (PgConnection) metadataConnection.connection();
|
||||
}
|
||||
|
||||
private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Table table) {
|
||||
|
Loading…
Reference in New Issue
Block a user