diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index 15a5bca24..a0288dcff 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -28,10 +28,13 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; +import io.debezium.connector.SnapshotRecord; import io.debezium.connector.mysql.RecordMakers.RecordsForTable; +import io.debezium.data.Envelope; import io.debezium.function.BufferedBlockingConsumer; import io.debezium.function.Predicates; import io.debezium.heartbeat.Heartbeat; @@ -601,7 +604,7 @@ protected void execute() { source.markLastSnapshot(context.config()); long stop = clock.currentTimeInMillis(); try { - bufferedRecordQueue.close(this::replaceOffset); + bufferedRecordQueue.close(this::replaceOffsetAndSource); if (logger.isInfoEnabled()) { logger.info("Step {}: scanned {} rows in {} tables in {}", step, totalRowCount, tableIds.size(), Strings.duration(stop - startScan)); @@ -867,17 +870,22 @@ private void logRolesForCurrentUser(JdbcConnection mysql) { } /** - * Utility method to replace the offset in the given record with the latest. This is used on the last record produced + * Utility method to replace the offset and the source in the given record with the latest. This is used on the last record produced * during the snapshot. * * @param record the record * @return the updated record */ - protected SourceRecord replaceOffset(SourceRecord record) { + protected SourceRecord replaceOffsetAndSource(SourceRecord record) { if (record == null) { return null; } Map newOffset = context.source().offset(); + final Struct envelope = (Struct) record.value(); + final Struct source = (Struct) envelope.get(Envelope.FieldName.SOURCE); + if (SnapshotRecord.fromSource(source) == SnapshotRecord.TRUE) { + SnapshotRecord.LAST.toSource(source); + } return new SourceRecord(record.sourcePartition(), newOffset, record.topic(), diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java index 3a2da2082..e567d5592 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java @@ -21,6 +21,7 @@ import io.debezium.annotation.NotThreadSafe; import io.debezium.config.Configuration; import io.debezium.connector.AbstractSourceInfo; +import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SourceInfoStructMaker; import io.debezium.data.Envelope; import io.debezium.document.Document; @@ -842,8 +843,11 @@ protected Instant timestamp() { } @Override - protected boolean snapshot() { - return isLastSnapshot(); + protected SnapshotRecord snapshot() { + if (isSnapshotInEffect()) { + return SnapshotRecord.TRUE; + } + return SnapshotRecord.FALSE; } @Override diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 0231d7066..7917ade7d 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -341,8 +341,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio SourceRecord secondToLast = allRecords.get(allRecords.size() - 2); assertThat(secondToLast.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isTrue(); assertThat(last.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse(); // not snapshot - assertThat(((Struct) secondToLast.value()).getStruct(Envelope.FieldName.SOURCE).getBoolean(SourceInfo.SNAPSHOT_KEY)).isTrue(); - assertThat(((Struct) last.value()).getStruct(Envelope.FieldName.SOURCE).getBoolean(SourceInfo.SNAPSHOT_KEY)).isTrue(); + assertThat(((Struct) secondToLast.value()).getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isEqualTo("true"); + assertThat(((Struct) last.value()).getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isEqualTo("last"); // --------------------------------------------------------------------------------------------------------------- // Stopping the connector does not lose events recorded when connector is not running diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java index 40d74d3bf..5a52dfe09 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java @@ -206,7 +206,6 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti VerifyRecord.hasNoSourceQuery(record); store.add(record); schemaChanges.add(record); - System.out.println(record); }); } // The last poll should always return null ... diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java index aa2d679f8..0692cf703 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SourceInfoTest.java @@ -24,6 +24,7 @@ import io.confluent.connect.avro.AvroData; import io.debezium.config.Configuration; +import io.debezium.connector.AbstractSourceInfoStructMaker; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; import io.debezium.document.Document; @@ -650,7 +651,7 @@ public void schemaIsCorrect() { .field("connector", Schema.STRING_SCHEMA) .field("name", Schema.STRING_SCHEMA) .field("ts_ms", Schema.INT64_SCHEMA) - .field("snapshot", SchemaBuilder.bool().optional().defaultValue(false).build()) + .field("snapshot", AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA) .field("db", Schema.STRING_SCHEMA) .field("table", Schema.OPTIONAL_STRING_SCHEMA) .field("server_id", Schema.INT64_SCHEMA) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java index 16de4d5a3..5ba126e0a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java @@ -300,7 +300,7 @@ private void takeSnapshot(BlockingConsumer consumer) { private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) { final Struct envelope = (Struct) currentRecord.value(); - final Struct source = (Struct) envelope.get("source"); + final Struct source = (Struct) envelope.get(Envelope.FieldName.SOURCE); if (source.schema().field(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null && source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) { source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true); } diff --git a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java index 8d21fd327..45135778f 100644 --- a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java +++ b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java @@ -24,7 +24,7 @@ public enum SnapshotRecord { LAST; public static SnapshotRecord fromSource(Struct source) { - if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null && io.debezium.data.Enum.LOGICAL_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).name())) { + if (source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY) != null && io.debezium.data.Enum.LOGICAL_NAME.equals(source.schema().field(AbstractSourceInfo.SNAPSHOT_KEY).schema().name())) { final String snapshotString = source.getString(AbstractSourceInfo.SNAPSHOT_KEY); if (snapshotString != null) { return SnapshotRecord.valueOf(snapshotString.toUpperCase());