DBZ-298 Formatting

This commit is contained in:
Gunnar Morling 2017-07-28 12:15:36 +02:00
parent b4a0ac0c35
commit 564d90dc09
6 changed files with 143 additions and 142 deletions

View File

@ -54,7 +54,7 @@ public class PostgresSchema {
/**
* Create a schema component given the supplied {@link PostgresConnectorConfig Postgres connector configuration}.
*
*
* @param config the connector configuration, which is presumed to be valid
*/
protected PostgresSchema(PostgresConnectorConfig config) {
@ -130,7 +130,7 @@ protected void refresh(PostgresConnection connection, TableId tableId) throws SQ
/**
* Get the {@link Filters database and table filters} defined by the configuration.
*
*
* @return the filters; never null
*/
public Filters filters() {
@ -140,7 +140,7 @@ public Filters filters() {
/**
* Get the {@link TableSchema Schema information} for the table with the given identifier, if that table exists and is
* included by the {@link #filters() filter}.
*
*
* @param id the fully-qualified table identifier; may be null
* @return the current table definition, or null if there is no table with the given identifier, if the identifier is null,
* or if the table has been excluded by the filters

View File

@ -21,6 +21,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
@ -40,19 +41,19 @@
/**
* Producer of {@link org.apache.kafka.connect.source.SourceRecord source records} from a database snapshot. Once completed,
* this producer can optionally continue streaming records, using another {@link RecordsStreamProducer} instance.
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
@ThreadSafe
public class RecordsSnapshotProducer extends RecordsProducer {
private static final String CONTEXT_NAME = "records-snapshot-producer";
private final ExecutorService executorService;
private final ExecutorService executorService;
private final Optional<RecordsStreamProducer> streamProducer;
private AtomicReference<SourceRecord> currentRecord;
public RecordsSnapshotProducer(PostgresTaskContext taskContext,
SourceInfo sourceInfo,
boolean continueStreamingAfterCompletion) {
@ -60,14 +61,14 @@ public RecordsSnapshotProducer(PostgresTaskContext taskContext,
executorService = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, CONTEXT_NAME + "-thread"));
currentRecord = new AtomicReference<>();
if (continueStreamingAfterCompletion) {
// we need to create the stream producer here to make sure it creates the replication connection;
// we need to create the stream producer here to make sure it creates the replication connection;
// otherwise we can't stream back changes happening while the snapshot is taking place
streamProducer = Optional.of(new RecordsStreamProducer(taskContext, sourceInfo));
} else {
streamProducer = Optional.empty();
}
}
@Override
protected void start(Consumer<SourceRecord> recordConsumer) {
// MDC should be in inherited from parent to child threads
@ -80,7 +81,7 @@ protected void start(Consumer<SourceRecord> recordConsumer) {
previousContext.restore();
}
}
private Void handleException(Throwable t) {
logger.error("unexpected exception", t.getCause() != null ? t.getCause() : t);
// always stop to clean up data
@ -88,26 +89,26 @@ private Void handleException(Throwable t) {
taskContext.failTask(t);
return null;
}
private void startStreaming(Consumer<SourceRecord> consumer) {
try {
// and then start streaming if necessary
streamProducer.ifPresent(producer -> {
logger.info("Snapshot finished, continuing streaming changes from {}", ReplicationConnection.format(sourceInfo.lsn()));
producer.start(consumer);
});
} finally {
// always cleanup our local data
cleanup();
}
}
@Override
protected void commit() {
streamProducer.ifPresent(RecordsStreamProducer::commit);
}
@Override
protected void stop() {
try {
@ -116,34 +117,34 @@ protected void stop() {
cleanup();
}
}
private void cleanup() {
currentRecord.set(null);
executorService.shutdownNow();
}
private void takeSnapshot(Consumer<SourceRecord> consumer) {
long snapshotStart = clock().currentTimeInMillis();
Connection jdbcConnection = null;
try (PostgresConnection connection = taskContext.createConnection()) {
jdbcConnection = connection.connection();
String lineSeparator = System.lineSeparator();
logger.info("Step 0: disabling autocommit");
connection.setAutoCommit(false);
long lockTimeoutMillis = taskContext.config().snapshotLockTimeoutMillis();
logger.info("Step 1: starting transaction and refreshing the DB schemas for database '{}' and user '{}'",
logger.info("Step 1: starting transaction and refreshing the DB schemas for database '{}' and user '{}'",
connection.database(), connection.username());
// we're using the same isolation level that pg_backup uses
StringBuilder statements = new StringBuilder("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;");
connection.executeWithoutCommitting(statements.toString());
statements.delete(0, statements.length());
//next refresh the schema which will load all the tables taking the filters into account
//next refresh the schema which will load all the tables taking the filters into account
PostgresSchema schema = schema();
schema.refresh(connection, false);
logger.info("Step 2: locking each of the database tables, waiting a maximum of '{}' seconds for each lock",
lockTimeoutMillis / 1000d);
statements.append("SET lock_timeout = ").append(lockTimeoutMillis).append(";").append(lineSeparator);
@ -154,21 +155,21 @@ private void takeSnapshot(Consumer<SourceRecord> consumer) {
.append(" IN SHARE UPDATE EXCLUSIVE MODE;")
.append(lineSeparator));
connection.executeWithoutCommitting(statements.toString());
//now that we have the locks, refresh the schema
schema.refresh(connection, false);
// get the current position in the log, from which we'll continue streaming once the snapshot it finished
// If rows are being inserted while we're doing the snapshot, the xlog pos should increase and so when
// If rows are being inserted while we're doing the snapshot, the xlog pos should increase and so when
// we start streaming, we should get back those changes
long xlogStart = connection.currentXLogLocation();
int txId = connection.currentTransactionId().intValue();
logger.info("\t read xlogStart at '{}' from transaction '{}'", ReplicationConnection.format(xlogStart), txId);
// and mark the start of the snapshot
sourceInfo.startSnapshot();
sourceInfo.update(xlogStart, clock().currentTimeInMicros(), txId);
logger.info("Step 3: reading and exporting the contents of each table");
AtomicInteger rowsCounter = new AtomicInteger(0);
schema.tables().forEach(tableId -> {
@ -189,15 +190,15 @@ private void takeSnapshot(Consumer<SourceRecord> consumer) {
throw new ConnectException(e);
}
});
// finally commit the transaction to release all the locks...
logger.info("Step 4: committing transaction '{}'", txId);
jdbcConnection.commit();
// process and send the last record after marking it as such
logger.info("Step 5: sending the last snapshot record");
SourceRecord currentRecord = this.currentRecord.get();
if (currentRecord != null) {
if (currentRecord != null) {
sourceInfo.markLastSnapshotRecord();
this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), sourceInfo.offset(),
currentRecord.topic(), currentRecord.kafkaPartition(),
@ -205,7 +206,7 @@ private void takeSnapshot(Consumer<SourceRecord> consumer) {
currentRecord.valueSchema(), currentRecord.value()));
sendCurrentRecord(consumer);
}
// and complete the snapshot
sourceInfo.completeSnapshot();
logger.info("Snapshot completed in '{}'", Strings.duration(clock().currentTimeInMillis() - snapshotStart));
@ -220,16 +221,16 @@ private void takeSnapshot(Consumer<SourceRecord> consumer) {
throw new ConnectException(e);
}
}
private Statement readTableStatement(Connection conn) throws SQLException {
int rowsFetchSize = taskContext.config().rowsFetchSize();
Statement statement = conn.createStatement(); // the default cursor is FORWARD_ONLY
statement.setFetchSize(rowsFetchSize);
return statement;
}
private void readTable(TableId tableId, ResultSet rs,
Consumer<SourceRecord> consumer,
Consumer<SourceRecord> consumer,
AtomicInteger rowsCounter) throws SQLException {
Table table = schema().tableFor(tableId);
assert table != null;
@ -245,7 +246,7 @@ private void readTable(TableId tableId, ResultSet rs,
generateReadRecord(tableId, row);
}
}
private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaData) throws SQLException {
try {
int jdbcSqlType = metaData.getColumnType(colIdx);
@ -270,7 +271,7 @@ private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaDa
return rs.getObject(colIdx);
}
}
protected void generateReadRecord(TableId tableId, Object[] rowData) {
if (rowData.length == 0) {
return;
@ -291,7 +292,7 @@ protected void generateReadRecord(TableId tableId, Object[] rowData) {
currentRecord.set(new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(),
envelope.read(value, sourceInfo.source(), clock().currentTimeInMillis())));
}
private void sendCurrentRecord(Consumer<SourceRecord> consumer) {
SourceRecord record = currentRecord.get();
if (record == null) {

View File

@ -26,6 +26,8 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray;
import org.postgresql.jdbc.PgConnection;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
@ -37,20 +39,18 @@
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.LoggingContext;
import org.postgresql.jdbc.PgArray;
import org.postgresql.jdbc.PgConnection;
/**
* A {@link RecordsProducer} which creates {@link org.apache.kafka.connect.source.SourceRecord records} from a Postgres
* streaming replication connection and {@link io.debezium.connector.postgresql.proto.PgProto messages}.
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
@ThreadSafe
public class RecordsStreamProducer extends RecordsProducer {
private static final String CONTEXT_NAME = "records-stream-producer";
private final ExecutorService executorService;
private final ReplicationConnection replicationConnection;
private final AtomicReference<ReplicationStream> replicationStream;
@ -72,7 +72,7 @@ public RecordsStreamProducer(PostgresTaskContext taskContext,
throw new ConnectException(e);
}
}
@Override
protected void start(Consumer<SourceRecord> recordConsumer) {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
@ -88,7 +88,7 @@ protected void start(Consumer<SourceRecord> recordConsumer) {
logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
replicationStream.compareAndSet(null, replicationConnection.startStreaming());
}
// refresh the schema so we have a latest view of the DB tables
taskContext.refreshSchema(true);
// the new thread will inherit it's parent MDC
@ -99,7 +99,7 @@ protected void start(Consumer<SourceRecord> recordConsumer) {
previousContext.restore();
}
}
private void streamChanges(Consumer<SourceRecord> consumer) {
ReplicationStream stream = this.replicationStream.get();
// run while we haven't been requested to stop
@ -121,7 +121,7 @@ private void streamChanges(Consumer<SourceRecord> consumer) {
}
}
}
@Override
protected synchronized void commit() {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
@ -140,7 +140,7 @@ protected synchronized void commit() {
previousContext.restore();
}
}
@Override
protected synchronized void stop() {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
@ -164,7 +164,7 @@ protected synchronized void stop() {
previousContext.restore();
}
}
private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord> consumer) throws SQLException {
if (message == null) {
// in some cases we can get null if PG gives us back a message earlier than the latest reported flushed LSN
@ -173,7 +173,7 @@ private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord
TableId tableId = PostgresSchema.parse(message.getTable());
assert tableId != null;
// update the source info with the coordinates for this message
long commitTimeNs = message.getCommitTime();
int txId = message.getTransactionId();
@ -181,7 +181,7 @@ private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord
if (logger.isDebugEnabled()) {
logger.debug("received new message at position {}\n{}", ReplicationConnection.format(lsn), message);
}
TableSchema tableSchema = tableSchemaFor(tableId);
if (tableSchema == null) {
return;
@ -189,7 +189,7 @@ private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord
if (tableSchema.keySchema() == null) {
logger.warn("ignoring message for table '{}' because it does not have a primary key defined", tableId);
}
PgProto.Op operation = message.getOp();
switch (operation) {
case INSERT: {
@ -213,7 +213,7 @@ private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord
}
}
}
protected void generateCreateRecord(TableId tableId, Object[] rowData, Consumer<SourceRecord> recordConsumer) {
if (rowData == null || rowData.length == 0) {
logger.warn("no new values found for table '{}' from update message at '{}';skipping record" , tableId, sourceInfo);
@ -231,7 +231,7 @@ protected void generateCreateRecord(TableId tableId, Object[] rowData, Consumer<
Map<String, ?> offset = sourceInfo.offset();
String topicName = topicSelector().topicNameFor(tableId);
Envelope envelope = createEnvelope(tableSchema, topicName);
SourceRecord record = new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(),
envelope.create(value, sourceInfo.source(), clock().currentTimeInMillis()));
if (logger.isDebugEnabled()) {
@ -239,7 +239,7 @@ protected void generateCreateRecord(TableId tableId, Object[] rowData, Consumer<
}
recordConsumer.accept(record);
}
protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object[] newRowData,
Consumer<SourceRecord> recordConsumer) {
if (newRowData == null || newRowData.length == 0) {
@ -249,29 +249,29 @@ protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object
Schema oldKeySchema = null;
Struct oldValue = null;
Object oldKey = null;
TableSchema tableSchema = schema().schemaFor(tableId);
assert tableSchema != null;
if (oldRowData != null && oldRowData.length > 0) {
oldKey = tableSchema.keyFromColumnData(oldRowData);
oldKeySchema = tableSchema.keySchema();
oldValue = tableSchema.valueFromColumnData(oldRowData);
}
Object newKey = tableSchema.keyFromColumnData(newRowData);
Struct newValue = tableSchema.valueFromColumnData(newRowData);
Schema newKeySchema = tableSchema.keySchema();
Map<String, ?> partition = sourceInfo.partition();
Map<String, ?> offset = sourceInfo.offset();
String topicName = topicSelector().topicNameFor(tableId);
Envelope envelope = createEnvelope(tableSchema, topicName);
Struct source = sourceInfo.source();
if (oldKey != null && !Objects.equals(oldKey, newKey)) {
// the primary key has changed, so we need to send a DELETE followed by a CREATE
// then send a delete event for the old key ...
SourceRecord record = new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, envelope.schema(),
envelope.delete(oldValue, source, clock().currentTimeInMillis()));
@ -279,14 +279,14 @@ protected void generateUpdateRecord(TableId tableId, Object[] oldRowData, Object
logger.debug("sending delete event '{}' to topic '{}'", record, topicName);
}
recordConsumer.accept(record);
// send a tombstone event (null value) for the old key so it can be removed from the Kafka log eventually...
record = new SourceRecord(partition, offset, topicName, null, oldKeySchema, oldKey, null, null);
if (logger.isDebugEnabled()) {
logger.debug("sending tombstone event '{}' to topic '{}'", record, topicName);
}
recordConsumer.accept(record);
// then send a create event for the new key...
record = new SourceRecord(partition, offset, topicName, null, newKeySchema, newKey, envelope.schema(),
envelope.create(newValue, source, clock().currentTimeInMillis()));
@ -301,7 +301,7 @@ record = new SourceRecord(partition, offset, topicName, null, newKeySchema, newK
recordConsumer.accept(record);
}
}
protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Consumer<SourceRecord> recordConsumer) {
if (oldRowData == null || oldRowData.length == 0) {
logger.warn("no values found for table '{}' from update message at '{}';skipping record" , tableId, sourceInfo);
@ -319,7 +319,7 @@ protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Consum
Map<String, ?> offset = sourceInfo.offset();
String topicName = topicSelector().topicNameFor(tableId);
Envelope envelope = createEnvelope(tableSchema, topicName);
// create the regular delete record
SourceRecord record = new SourceRecord(partition, offset, topicName, null,
keySchema, key, envelope.schema(),
@ -328,7 +328,7 @@ protected void generateDeleteRecord(TableId tableId, Object[] oldRowData, Consum
logger.debug("sending delete event '{}' to topic '{}'", record, topicName);
}
recordConsumer.accept(record);
// And send a tombstone event (null value) for the old key so it can be removed from the Kafka log eventually...
record = new SourceRecord(partition, offset, topicName, null, keySchema, key, null, null);
if (logger.isDebugEnabled()) {
@ -336,7 +336,7 @@ record = new SourceRecord(partition, offset, topicName, null, keySchema, key, nu
}
recordConsumer.accept(record);
}
private Object[] columnValues(List<PgProto.DatumMessage> messageList, TableId tableId, boolean refreshSchemaIfChanged)
throws SQLException {
if (messageList == null || messageList.isEmpty()) {
@ -344,13 +344,13 @@ private Object[] columnValues(List<PgProto.DatumMessage> messageList, TableId ta
}
Table table = schema().tableFor(tableId);
assert table != null;
// check if we need to refresh our local schema due to DB schema changes for this table
if (refreshSchemaIfChanged && schemaChanged(messageList, table)) {
schema().refresh(taskContext.createConnection(), tableId);
table = schema().tableFor(tableId);
}
// based on the schema columns, create the values on the same position as the columns
List<String> columnNames = table.columnNames();
Object[] values = new Object[messageList.size()];
@ -361,7 +361,7 @@ private Object[] columnValues(List<PgProto.DatumMessage> messageList, TableId ta
});
return values;
}
private boolean schemaChanged(List<PgProto.DatumMessage> messageList, Table table) {
List<String> columnNames = table.columnNames();
int messagesCount = messageList.size();
@ -370,7 +370,7 @@ private boolean schemaChanged(List<PgProto.DatumMessage> messageList, Table tabl
// so we need to trigger a refresh...
return true;
}
// go through the list of columns from the message to figure out if any of them are new or have changed their type based
// on what we have in the table metadata....
return messageList.stream().filter(message -> {
@ -387,7 +387,7 @@ private boolean schemaChanged(List<PgProto.DatumMessage> messageList, Table tabl
return false;
}).findFirst().isPresent();
}
private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
PostgresSchema schema = schema();
if (schema.isFilteredOut(tableId)) {
@ -410,7 +410,7 @@ private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
return tableSchema;
}
}
/**
* Converts the Protobuf value for a {@link io.debezium.connector.postgresql.proto.PgProto.DatumMessage plugin message} to
* a Java value based on the type of the column from the message. This value will be converted later on if necessary by the

View File

@ -28,79 +28,79 @@
/**
* Integration test for {@link RecordsSnapshotProducerIT}
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class RecordsSnapshotProducerIT extends AbstractRecordsProducerTest {
private RecordsSnapshotProducer snapshotProducer;
private PostgresTaskContext context;
@Before
public void before() throws SQLException {
TestHelper.dropAllSchemas();
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
context = new PostgresTaskContext(config, new PostgresSchema(config));
}
@After
public void after() throws Exception {
if (snapshotProducer != null) {
snapshotProducer.stop();
}
}
@Test
public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER), false);
TestHelper.executeDDL("postgres_create_tables.ddl");
TestConsumer consumer = testConsumer(ALL_STMTS.size());
//insert data for each of different supported types
String statementsBuilder = ALL_STMTS.stream().collect(Collectors.joining(";" + System.lineSeparator())) + ";";
TestHelper.execute(statementsBuilder);
//then start the producer and validate all records are there
snapshotProducer.start(consumer);
consumer.await(2, TimeUnit.SECONDS);
Map<String, List<SchemaAndValueField>> expectedValuesByTableName = super.schemaAndValuesByTableName();
consumer.process(record -> assertReadRecord(record, expectedValuesByTableName));
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffset(record, true, consumer.isEmpty());
}
}
@Test
public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
String insertStmt = "INSERT INTO s1.a (aa) VALUES (1);" +
"INSERT INTO s2.a (aa) VALUES (1);";
String statements = "CREATE SCHEMA s1; " +
"CREATE SCHEMA s2; " +
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
insertStmt;
TestHelper.execute(statements);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER), true);
TestConsumer consumer = testConsumer(2);
snapshotProducer.start(consumer);
// first make sure we get the initial records from both schemas...
consumer.await(2, TimeUnit.SECONDS);
consumer.clear();
// then insert some more data and check that we get it back
TestHelper.execute(insertStmt);
consumer.expects(2);
consumer.await(2, TimeUnit.SECONDS);
SourceRecord first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 2);
assertEquals(topicName("s1.a"), first.topic());
@ -110,11 +110,11 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
VerifyRecord.isValidInsert(second, PK_FIELD, 2);
assertEquals(topicName("s2.a"), second.topic());
assertRecordOffset(second, false, false);
// now shut down the producers and insert some more records
snapshotProducer.stop();
TestHelper.execute(insertStmt);
// start a new producer back up, take a new snapshot (we expect all the records to be read back)
int expectedRecordsCount = 6;
consumer = testConsumer(expectedRecordsCount);
@ -130,16 +130,16 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
assertRecordOffset(record, true, counterVal == (expectedRecordsCount - 1));
});
consumer.clear();
// now insert two more records and check that we only get those back from the stream
TestHelper.execute(insertStmt);
consumer.expects(2);
consumer.await(2, TimeUnit.SECONDS);
first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 4);
assertRecordOffset(first, false, false);
second = consumer.remove();
VerifyRecord.isValidInsert(second, PK_FIELD, 4);
assertRecordOffset(second, false, false);

View File

@ -27,18 +27,18 @@
/**
* Integration test for the {@link RecordsStreamProducer} class. This also tests indirectly the PG plugin functionality for
* different use cases.
*
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
private RecordsStreamProducer recordsProducer;
private TestConsumer consumer;
@Before
public void before() throws Exception {
TestHelper.dropAllSchemas();
String statements = "CREATE SCHEMA public;" +
String statements = "CREATE SCHEMA public;" +
"DROP TABLE IF EXISTS test_table;" +
"CREATE TABLE test_table (pk SERIAL, text TEXT, PRIMARY KEY(pk));" +
"INSERT INTO test_table(text) VALUES ('insert');";
@ -47,44 +47,44 @@ public void before() throws Exception {
PostgresTaskContext context = new PostgresTaskContext(config, new PostgresSchema(config));
recordsProducer = new RecordsStreamProducer(context, new SourceInfo(config.serverName()));
}
@After
public void after() throws Exception {
if (recordsProducer != null) {
recordsProducer.stop();
}
}
@Test
public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl");
consumer = testConsumer(1);
recordsProducer.start(consumer);
//numerical types
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericType());
// string types
consumer.expects(1);
assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypes());
// monetary types
consumer.expects(1);
assertInsert(INSERT_CASH_TYPES_STMT, schemaAndValuesForMoneyTypes());
// bits and bytes
consumer.expects(1);
assertInsert(INSERT_BIN_TYPES_STMT, schemaAndValuesForBinTypes());
//date and time
consumer.expects(1);
assertInsert(INSERT_DATE_TIME_TYPES_STMT, schemaAndValuesForDateTimeTypes());
// text
consumer.expects(1);
assertInsert(INSERT_TEXT_TYPES_STMT, schemasAndValuesForTextTypes());
// geom types
consumer.expects(1);
assertInsert(INSERT_GEOM_TYPES_STMT, schemaAndValuesForGeomTypes());
@ -115,7 +115,7 @@ public void shouldReceiveChangesForNewTable() throws Exception {
executeAndWait(statement);
assertRecordInserted("s1.a", PK_FIELD, 1);
}
@Test
public void shouldReceiveChangesForRenamedTable() throws Exception {
String statement = "DROP TABLE IF EXISTS renamed_test_table;" +
@ -132,13 +132,13 @@ public void shouldReceiveChangesForUpdates() throws Exception {
consumer = testConsumer(1);
recordsProducer.start(consumer);
executeAndWait("UPDATE test_table set text='update' WHERE pk=1");
// the update record should be the last record
SourceRecord updatedRecord = consumer.remove();
String topicName = topicName("public.test_table");
assertEquals(topicName, updatedRecord.topic());
VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1);
// default replica identity only fires previous values for PK changes
List<SchemaAndValueField> expectedAfter = Collections.singletonList(
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update"));
@ -148,96 +148,96 @@ public void shouldReceiveChangesForUpdates() throws Exception {
consumer.expects(1);
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL");
executeAndWait("UPDATE test_table set text='update2' WHERE pk=1");
updatedRecord = consumer.remove();
assertEquals(topicName, updatedRecord.topic());
VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1);
// now we should get both old and new values
List<SchemaAndValueField> expectedBefore = Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA,
"update"));
assertRecordSchemaAndValues(expectedBefore, updatedRecord, Envelope.FieldName.BEFORE);
expectedAfter = Collections.singletonList(new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update2"));
assertRecordSchemaAndValues(expectedAfter, updatedRecord, Envelope.FieldName.AFTER);
}
@Test
public void shouldReceiveChangesForUpdatesWithColumnChanges() throws Exception {
// add a new column
String statements = "ALTER TABLE test_table ADD COLUMN uvc VARCHAR(2);" +
"ALTER TABLE test_table REPLICA IDENTITY FULL;" +
"UPDATE test_table SET uvc ='aa' WHERE pk = 1;";
consumer = testConsumer(1);
recordsProducer.start(consumer);
executeAndWait(statements);
// the update should be the last record
SourceRecord updatedRecord = consumer.remove();
String topicName = topicName("public.test_table");
assertEquals(topicName, updatedRecord.topic());
VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1);
// now check we got the updated value (the old value should be null, the new one whatever we set)
List<SchemaAndValueField> expectedBefore = Collections.singletonList(new SchemaAndValueField("uvc", null, null));
assertRecordSchemaAndValues(expectedBefore, updatedRecord, Envelope.FieldName.BEFORE);
List<SchemaAndValueField> expectedAfter = Collections.singletonList(new SchemaAndValueField("uvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA,
List<SchemaAndValueField> expectedAfter = Collections.singletonList(new SchemaAndValueField("uvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA,
"aa"));
assertRecordSchemaAndValues(expectedAfter, updatedRecord, Envelope.FieldName.AFTER);
// rename a column
statements = "ALTER TABLE test_table RENAME COLUMN uvc to xvc;" +
"UPDATE test_table SET xvc ='bb' WHERE pk = 1;";
consumer.expects(1);
executeAndWait(statements);
updatedRecord = consumer.remove();
VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1);
// now check we got the updated value (the old value should be null, the new one whatever we set)
expectedBefore = Collections.singletonList(new SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "aa"));
assertRecordSchemaAndValues(expectedBefore, updatedRecord, Envelope.FieldName.BEFORE);
expectedAfter = Collections.singletonList(new SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "bb"));
assertRecordSchemaAndValues(expectedAfter, updatedRecord, Envelope.FieldName.AFTER);
// drop a column
statements = "ALTER TABLE test_table DROP COLUMN xvc;" +
"UPDATE test_table SET text ='update' WHERE pk = 1;";
consumer.expects(1);
executeAndWait(statements);
updatedRecord = consumer.remove();
VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1);
}
@Test
public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
consumer = testConsumer(3);
recordsProducer.start(consumer);
executeAndWait("UPDATE test_table SET text = 'update', pk = 2");
String topicName = topicName("public.test_table");
// first should be a delete of the old pk
SourceRecord deleteRecord = consumer.remove();
assertEquals(topicName, deleteRecord.topic());
VerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1);
// followed by a tombstone of the old pk
SourceRecord tombstoneRecord = consumer.remove();
assertEquals(topicName, tombstoneRecord.topic());
VerifyRecord.isValidTombstone(tombstoneRecord, PK_FIELD, 1);
// and finally insert of the new value
SourceRecord insertRecord = consumer.remove();
assertEquals(topicName, insertRecord.topic());
VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2);
VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2);
}
@Test
public void shouldReceiveChangesForDefaultValues() throws Exception {
String statements = "ALTER TABLE test_table REPLICA IDENTITY FULL;" +
@ -255,7 +255,7 @@ public void shouldReceiveChangesForDefaultValues() throws Exception {
new SchemaAndValueField("default_column", SchemaBuilder.OPTIONAL_STRING_SCHEMA ,"default"));
assertRecordSchemaAndValues(expectedSchemaAndValues, insertRecord, Envelope.FieldName.AFTER);
}
@Test
public void shouldReceiveChangesForDeletes() throws Exception {
// add a new entry and remove both
@ -264,17 +264,17 @@ public void shouldReceiveChangesForDeletes() throws Exception {
consumer = testConsumer(5);
recordsProducer.start(consumer);
executeAndWait(statements);
String topicPrefix = "public.test_table";
String topicName = topicName(topicPrefix);
assertRecordInserted(topicPrefix, PK_FIELD, 2);
// first entry removed
SourceRecord record = consumer.remove();
assertEquals(topicName, record.topic());
VerifyRecord.isValidDelete(record, PK_FIELD, 1);
// followed by a tombstone
record = consumer.remove();
assertEquals(topicName, record.topic());
@ -290,7 +290,7 @@ record = consumer.remove();
assertEquals(topicName, record.topic());
VerifyRecord.isValidTombstone(record, PK_FIELD, 2);
}
private void assertInsert(String statement, List<SchemaAndValueField> expectedSchemaAndValuesByColumn) {
String tableName = tableNameFromInsertStmt(statement);
String expectedTopicName = "public." + tableName;
@ -303,7 +303,7 @@ private void assertInsert(String statement, List<SchemaAndValueField> expectedSc
throw new RuntimeException(e);
}
}
private SourceRecord assertRecordInserted(String expectedTopicName, String pkColumn, int pk) throws InterruptedException {
assertFalse("records not generated", consumer.isEmpty());
SourceRecord insertedRecord = consumer.remove();
@ -311,7 +311,7 @@ private SourceRecord assertRecordInserted(String expectedTopicName, String pkCol
VerifyRecord.isValidInsert(insertedRecord, pkColumn, pk);
return insertedRecord;
}
private void executeAndWait(String statements) throws Exception {
TestHelper.execute(statements);
consumer.await(2, TimeUnit.SECONDS);

View File

@ -229,4 +229,4 @@ protected Map<String, String> vars(String var1, String val1) {
protected Map<String, String> vars(String var1, String val1, String var2, String val2) {
return Collect.hashMapOf(var1, val1, var2, val2);
}
}
}