From b2dad7df3a85761a56a2d9acaa33759583376dcd Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 16 Jun 2020 11:55:21 +0200 Subject: [PATCH] DBZ-1812 Tested with PostgreSQL --- debezium-e2e-benchmark/Dockerfile | 3 +- debezium-e2e-benchmark/README.md | 33 +++---- .../docker-compose-db2-tpc.yaml | 1 + .../docker-compose-postgresql-tpc.yaml | 1 + debezium-e2e-benchmark/py/runplots.py | 2 +- debezium-e2e-benchmark/py/tpc-config.json | 8 +- debezium-e2e-benchmark/py/tpc-run-test.py | 85 +++++++++++-------- debezium-e2e-benchmark/py/tpcplot.py | 6 +- 8 files changed, 75 insertions(+), 64 deletions(-) diff --git a/debezium-e2e-benchmark/Dockerfile b/debezium-e2e-benchmark/Dockerfile index b1da7ba61..954f30af2 100644 --- a/debezium-e2e-benchmark/Dockerfile +++ b/debezium-e2e-benchmark/Dockerfile @@ -12,8 +12,6 @@ RUN useradd -ms /bin/bash tpc USER tpc WORKDIR /home/tpc -ADD py/ /home/tpc - RUN mkdir /home/tpc/jdbcdriver run curl https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.0.0/jcc-11.5.0.0.jar --output /home/tpc/jdbcdriver/jcc.jar run curl https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar --output /home/tpc/jdbcdriver/mysql.jar @@ -21,5 +19,6 @@ run curl https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/8.2.0 run curl https://repo1.maven.org/maven2/postgresql/postgresql/9.1-901.jdbc4/postgresql-9.1-901.jdbc4.jar --output /home/tpc/jdbcdriver/postgresql.jar run curl https://repo1.maven.org/maven2/com/oracle/ojdbc/ojdbc10/19.3.0.0/ojdbc10-19.3.0.0.jar --output /home/tpc/jdbcdriver/ojdbc10.jar +ADD py/ /home/tpc CMD ["/usr/bin/tail","-f","/dev/null"] diff --git a/debezium-e2e-benchmark/README.md b/debezium-e2e-benchmark/README.md index eeea89326..2fd0d957b 100644 --- a/debezium-e2e-benchmark/README.md +++ b/debezium-e2e-benchmark/README.md @@ -60,7 +60,7 @@ If you have an existing up and running Debezium environment, you can do the benc ``` CREATE TABLE TPC.TEST ( USERNAME VARCHAR(32) NOT NULL, NAME VARCHAR(64), BLOOD_GROUP CHAR(3), RESIDENCE VARCHAR(200), COMPANY VARCHAR(128), ADDRESS VARCHAR(200), BIRTHDATE DATE, SEX CHAR(1), JOB VARCHAR(128), SSN CHAR(11), MAIL VARCHAR(128), ID INTEGER NOT NULL AUTO_INCREMENT, T0 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ``` - Whitelist the TPC.TEST table in your Debezium connector config JSON -``` "database.whitelist" : "TPC.TEST" ``` +``` "table.whitelist" : "TPC.TEST" ``` - Enable the table for CDC on the database - SQL for db2 @@ -104,37 +104,28 @@ yum -y install git ### Run a self-contained TPC set up for DB2 with Docker compose ``` -cd /home/ +export DEBEZIUM_VERSION=1.2 +export DEBEZIUM_DB2_VOLUME=$HOME/dockerdata +export DEBEZIUM_TPC_VOLUME=$HOME/tpcdata + +mkdir $DEBEZIUM_DB2_VOLUME +mkdir $DEBEZIUM_TPC_VOLUME +chmod 777 $DEBEZIUM_TPC_VOLUME + +cd $HOME git clone https://github.com/debezium/debezium-examples -mkdir /home//dockerdata -export DEBEZIUM_VERSION=1.1 -export DEBEZIUM_DB2_VOLUME=/home//dockerdata -git clone https://github.com/debezium/debezium-examples -mkdir tpcdata -chmod 777 /home//tpcdata -export DEBEZIUM_TPC_VOLUME=/home//tpcdata git clone https://github.com/debezium/debezium + cd debezium/debezium-e2e-benchmark # if you like do do it with an other db, adapt the docker-compose file to your prefered database and update the tpc-config.json file for appropriate database ( SQL) docker-compose -f docker-compose-db2-tpc.yaml up --build ``` -### Kafka change to enable topic deletion - -``` -docker exec -it debeziumtpc_kafka_1 /bin/bash -cd /kafka -echo 'delete.topic.enable=true' >> /kafka/config/server.properties -exit -docker stop debeziumtpc_kafka_1 -docker start debeziumtpc_kafka_1 -``` - # Run test and plots Once everything is set up, to actually run the tests: ``` -docker exec -it debeziumtpc_tpc_1 /bin/bash +docker-compose -f docker-compose-db2-tpc.yaml exec tpc bash python3 tpc-run-test.py python3 runplots.py ``` diff --git a/debezium-e2e-benchmark/docker-compose-db2-tpc.yaml b/debezium-e2e-benchmark/docker-compose-db2-tpc.yaml index 3b0884dec..0b85efdda 100644 --- a/debezium-e2e-benchmark/docker-compose-db2-tpc.yaml +++ b/debezium-e2e-benchmark/docker-compose-db2-tpc.yaml @@ -14,6 +14,7 @@ services: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_DELETE_TOPIC_ENABLE=true db2server: image: debezium/db2-cdc:${DEBEZIUM_VERSION} build: diff --git a/debezium-e2e-benchmark/docker-compose-postgresql-tpc.yaml b/debezium-e2e-benchmark/docker-compose-postgresql-tpc.yaml index ae50686f3..dd6468d16 100644 --- a/debezium-e2e-benchmark/docker-compose-postgresql-tpc.yaml +++ b/debezium-e2e-benchmark/docker-compose-postgresql-tpc.yaml @@ -14,6 +14,7 @@ services: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_DELETE_TOPIC_ENABLE=true postgres: image: debezium/example-postgres:${DEBEZIUM_VERSION} ports: diff --git a/debezium-e2e-benchmark/py/runplots.py b/debezium-e2e-benchmark/py/runplots.py index 0d24b9c27..e46e4a186 100755 --- a/debezium-e2e-benchmark/py/runplots.py +++ b/debezium-e2e-benchmark/py/runplots.py @@ -20,7 +20,7 @@ import shlex DEBEZIUM_TPC_VOLUME = '/home/tpc/tpcdata' -with open('config.json') as f: +with open('tpc-config.json') as f: tpcconfig = json.load(f) for x in range(len(tpcconfig['tpc']['commit.intervals'])): diff --git a/debezium-e2e-benchmark/py/tpc-config.json b/debezium-e2e-benchmark/py/tpc-config.json index 64adcd616..3e27aef67 100644 --- a/debezium-e2e-benchmark/py/tpc-config.json +++ b/debezium-e2e-benchmark/py/tpc-config.json @@ -54,12 +54,14 @@ "postgresql": { "jdbcdriver": "org.postgresql.Driver", "jar": "postgresql.jar", - "tpctable": "", + "tpctable": "CREATE TABLE INVENTORY.TEST ( USERNAME VARCHAR(32) NOT NULL, NAME VARCHAR(64), BLOOD_GROUP CHAR(3), RESIDENCE VARCHAR(200), COMPANY VARCHAR(128), ADDRESS VARCHAR(200), BIRTHDATE DATE, SEX CHAR(1), JOB VARCHAR(128), SSN CHAR(11), MAIL VARCHAR(128), ID INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, T0 TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP )", "initsql": [], - "enablecdctablesql": [] + "enablecdctablesql": [], + "table": "inventory.test", + "lowercase": "true" } }, "sql": { - "insert": "INSERT INTO TPC.TEST (USERNAME, NAME, BLOOD_GROUP ,RESIDENCE, COMPANY, ADDRESS, SEX) VALUES ( 'USERNAMEDAT', 'NAME', 'AB+', 'WELCOME TO EARTH', 'WORK TO WORK GMBH', 'HELP WORKING', 'M' )" + "insert": "INSERT INTO INVENTORY.TEST (USERNAME, NAME, BLOOD_GROUP ,RESIDENCE, COMPANY, ADDRESS, SEX) VALUES ( 'USERNAMEDAT', 'NAME', 'AB+', 'WELCOME TO EARTH', 'WORK TO WORK GMBH', 'HELP WORKING', 'M' )" } } diff --git a/debezium-e2e-benchmark/py/tpc-run-test.py b/debezium-e2e-benchmark/py/tpc-run-test.py index 67de50b7d..72e0bc6af 100755 --- a/debezium-e2e-benchmark/py/tpc-run-test.py +++ b/debezium-e2e-benchmark/py/tpc-run-test.py @@ -16,7 +16,9 @@ import threading import jpype -tpchomedir = "/home/tpc" +tpchomedir = '/home/tpc' +table = '' +lowercase = 'false' def initsql(conn, config, tpcconfig): @@ -66,9 +68,9 @@ def enablecdctablesql(conn, config, tpcconfig): return 0 -def topicexport(bootstrapserver, count, commitinterval): +def topicexport(bootstrapserver, topicname, count, commitinterval): global tpchomedir - consumer = KafkaConsumer('db2server.TPC.TEST', + consumer = KafkaConsumer(topicname, bootstrap_servers=bootstrapserver, auto_offset_reset='smallest', enable_auto_commit=True, @@ -85,8 +87,13 @@ def topicexport(bootstrapserver, count, commitinterval): message.value)) d = json.loads(message.value) - file.write(str(message.timestamp) + "000;" + str(d['payload']['after']['T0']) + - ";" + d['payload']['op'] + ";" + str(i) + ";" + str(d['payload']['after']['ID']) + "\n") + idf = 'ID' + t0f = 'T0' + if lowercase: + idf = 'id' + t0f = 't0' + file.write(str(message.timestamp) + "000;" + str(d['payload']['after'][t0f]) + + ";" + d['payload']['op'] + ";" + str(i) + ";" + str(d['payload']['after'][idf]) + "\n") i = i + 1 print(i) print(count) @@ -95,9 +102,8 @@ def topicexport(bootstrapserver, count, commitinterval): file.close() -def getjdbcconnection(config, tpcconfig): +def getjdbcconnection(config, tpcconfig, connectiontype): global tpchomedir - connectiontype = config['config']['connector.class'].split('.')[3] jdbctype = 'jdbc:' + connectiontype + '://' if connectiontype == 'oracle': jdbctype = 'jdbc:oracle:thin:@' @@ -112,23 +118,35 @@ def getjdbcconnection(config, tpcconfig): def main(argv): - with open('../register.json') as f: + with open('register.json') as f: config = json.load(f) with open('tpc-config.json') as f: tpcconfig = json.load(f) + if (len(argv) > 0): + bootstrapserver = argv[0] print(config['config']['connector.class']) print(config['name']) config['name'] = 'tpc-connector' print(config['name']) - config['config']['database.whitelist'] = 'TPC.TEST' config['config']['database.history.kafka.topic'] = 'tpc-test' databasetype = config['config']['connector.class'] - print(databasetype.split('.')[3]) + connectiontype = config['config']['connector.class'].split('.')[3] print(databasetype) - conn = getjdbcconnection(config, tpcconfig) + print(connectiontype) + + table = tpcconfig['jdbc'][connectiontype].get('table') + if table == None: + table = 'TPC.TEST' + config['config']['table.whitelist'] = table + + lowercase = tpcconfig['jdbc'][connectiontype].get('lowercase') + if lowercase == None: + lowercase = False + + conn = getjdbcconnection(config, tpcconfig, connectiontype) initsql(conn, config, tpcconfig) createTPCTable(conn, config, tpcconfig) @@ -150,41 +168,41 @@ def main(argv): 'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector', verify=False) print(resp.content) print(resp.status_code) - # retvalue = json.loads(resp.content) - # print(retvalue) if (resp.status_code == 404): print('tpc-connector not exists') else: print('tpc-connector deleted') pass - dockerbootstrapserver = config['config']['database.history.kafka.bootstrap.servers'] - bootstrapserver = config['config']['database.history.kafka.bootstrap.servers'].split( - ",") + databaseservername = config['config']['database.server.name'] + topicname = databaseservername + '.' + table + historybootstrapserver = config['config'].get('database.history.kafka.bootstrap.servers') + if historybootstrapserver != None: + bootstrapserver = historybootstrapserver.split(",") + # check integrated test ( all in one docker) - if dockerbootstrapserver == 'kafka:9092': + if bootstrapserver == 'kafka:9092': print(bootstrapserver) kafkaadmin = KafkaAdminClient(bootstrap_servers=bootstrapserver) try: kafkaadmin.delete_topics( - [config['config']['database.server.name'] + '.TPC.TEST'], 30) + [topicname], 30) except: - print(config['config']['database.server.name'] + - '.TPC.TEST TOPIC not exists') + print(topicname + ' TOPIC not exists') else: - print(config['config']['database.server.name'] + - '.TPC.TEST TOPIC deleted') - try: - kafkaadmin.delete_topics( - [config['config']['database.history.kafka.topic']], 30) - except: - print(config['config']['database.history.kafka.topic'] + - ' TOPIC not exists') - else: - print(config['config'] - ['database.history.kafka.topic'] + ' TOPIC deleted') + print(topicname + ' TOPIC deleted') + if historybootstrapserver != None: + try: + kafkaadmin.delete_topics( + [config['config']['database.history.kafka.topic']], 30) + except: + print(config['config']['database.history.kafka.topic'] + + ' TOPIC not exists') + else: + print(config['config'] + ['database.history.kafka.topic'] + ' TOPIC deleted') # start tpc connector print('start tpc connector') @@ -214,11 +232,10 @@ def main(argv): if ((y % (tpcconfig['tpc']['commit.intervals'][x])) == (tpcconfig['tpc']['commit.intervals'][x] - 1)): conn.commit() conn.commit() - topicexport(bootstrapserver, int( + topicexport(bootstrapserver, topicname, int( tpcconfig['tpc']['count']), tpcconfig['tpc']['commit.intervals'][x]) - kafkaadmin.delete_topics( - [config['config']['database.server.name'] + '.TPC.TEST'], 30) + kafkaadmin.delete_topics([topicname], 30) print('Wait 30 second for TOPIC clean up') time.sleep(30) diff --git a/debezium-e2e-benchmark/py/tpcplot.py b/debezium-e2e-benchmark/py/tpcplot.py index b7d509274..19b7eb64f 100755 --- a/debezium-e2e-benchmark/py/tpcplot.py +++ b/debezium-e2e-benchmark/py/tpcplot.py @@ -42,14 +42,14 @@ ymax=max(y) plt.subplot(3, 1, 1) plt.plot(x,y) plt.xlabel('millisecond') -plt.ylabel('delta insert db2 to topic') +plt.ylabel('delta insert database to topic') plt.title('TPC Graph\nInsert data') plt.legend() axes = plt.gca() axes.set_xlim([xmin,xmax]) axes.set_ylim([ymin,ymax]) -print('delta insert db2 to topic') +print('delta insert database to topic') print('min :',min(y)) print('max :',max(y)) print('average :',sum(y)/len(y)) @@ -78,7 +78,7 @@ axes = plt.gca() axes.set_xlim([xmin,xmax]) axes.set_ylim([ymin,40]) -print('db2 in ') +print('database in ') print('min :',min(y)) print('max :',max(y)) print('average :',sum(y)/len(y))