DBZ-1440 Supports exported snapshots for Pg 9.4+

This commit is contained in:
Cyril Scetbon 2019-08-18 10:18:32 -04:00 committed by Chris Cranford
parent 4ae105bdce
commit 4b5475391c

View File

@ -203,17 +203,16 @@ else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
} }
private boolean useTemporarySlot() throws SQLException { private boolean useTemporarySlot() throws SQLException {
return dropSlotOnClose && isPg10OrGreater(); return dropSlotOnClose && isPg94OrGreater();
} }
private boolean exportSnapshotOnCreate() throws SQLException { private boolean exportSnapshotOnCreate() throws SQLException {
return exportSnapshot && isPg10OrGreater(); return exportSnapshot && isPg94OrGreater();
} }
private boolean isPg10OrGreater() throws SQLException { private boolean isPg94OrGreater() throws SQLException {
return pgConnection().getServerMajorVersion() >= 10; return pgConnection().haveMinimumServerVersion(90400);
} }
/** /**
* creating a replication connection and starting to stream involves a few steps: * creating a replication connection and starting to stream involves a few steps:
* 1. we create the connection and ensure that * 1. we create the connection and ensure that
@ -270,37 +269,34 @@ public void initConnection() throws SQLException, InterruptedException {
@Override @Override
public Optional<SlotCreationResult> createReplicationSlot() throws SQLException { public Optional<SlotCreationResult> createReplicationSlot() throws SQLException {
// note that some of these options are only supported in pg10 or newer, additionally // note that some of these options are only supported in pg94+, additionally
// the options are not yet exported by the jdbc api wrapper, therefore, we just do this ourselves // the options are not yet exported by the jdbc api wrapper, therefore, we just do this ourselves
// but eventually this should be moved back to the jdbc API // but eventually this should be moved back to the jdbc API
// see https://github.com/pgjdbc/pgjdbc/issues/1305 // see https://github.com/pgjdbc/pgjdbc/issues/1305
LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin); LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin);
String tempPart = ""; String tempPart = "";
String exportPart = ""; Boolean canExportSnapshot = isPg94OrGreater();
Boolean isPg10 = isPg10OrGreater(); if ((dropSlotOnClose || exportSnapshot) && !canExportSnapshot) {
if ((dropSlotOnClose || exportSnapshot) && !isPg10OrGreater()) { LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, " +
LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, but not on a supported version of Postgres, ignoring!"); "but not on a supported version of Postgres, ignoring!");
} }
if (useTemporarySlot()) { if (useTemporarySlot()) {
tempPart = "TEMPORARY"; tempPart = "TEMPORARY";
} }
if (exportSnapshotOnCreate()) {
exportPart = "EXPORT_SNAPSHOT";
}
try (Statement stmt = pgConnection().createStatement()) { try (Statement stmt = pgConnection().createStatement()) {
String createCommand = String.format( String createCommand = String.format(
"CREATE_REPLICATION_SLOT %s %s LOGICAL %s %s", "CREATE_REPLICATION_SLOT %s %s LOGICAL %s",
slotName, slotName,
tempPart, tempPart,
plugin.getPostgresPluginName(), plugin.getPostgresPluginName()
exportPart
); );
LOGGER.info("Creating replication slot with command {}", createCommand); LOGGER.info("Creating replication slot with command {}", createCommand);
stmt.execute(createCommand); stmt.execute(createCommand);
// when we are in pg10, we can parse the slot creation info, otherwise, it returns // when we are in pg94+, we can parse the slot creation info, otherwise, it returns
// nothing // nothing
if (isPg10) { if (canExportSnapshot) {
this.slotCreationInfo = parseSlotCreation(stmt.getResultSet()); this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
} }