DBZ-1812 Tested with PostgreSQL

This commit is contained in:
Jiri Pechanec 2020-06-16 11:55:21 +02:00
parent 3fc4312ff5
commit b2dad7df3a
8 changed files with 75 additions and 64 deletions

View File

@ -12,8 +12,6 @@ RUN useradd -ms /bin/bash tpc
USER tpc
WORKDIR /home/tpc
ADD py/ /home/tpc
RUN mkdir /home/tpc/jdbcdriver
run curl https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.0.0/jcc-11.5.0.0.jar --output /home/tpc/jdbcdriver/jcc.jar
run curl https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar --output /home/tpc/jdbcdriver/mysql.jar
@ -21,5 +19,6 @@ run curl https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/8.2.0
run curl https://repo1.maven.org/maven2/postgresql/postgresql/9.1-901.jdbc4/postgresql-9.1-901.jdbc4.jar --output /home/tpc/jdbcdriver/postgresql.jar
run curl https://repo1.maven.org/maven2/com/oracle/ojdbc/ojdbc10/19.3.0.0/ojdbc10-19.3.0.0.jar --output /home/tpc/jdbcdriver/ojdbc10.jar
ADD py/ /home/tpc
CMD ["/usr/bin/tail","-f","/dev/null"]

View File

@ -60,7 +60,7 @@ If you have an existing up and running Debezium environment, you can do the benc
``` CREATE TABLE TPC.TEST ( USERNAME VARCHAR(32) NOT NULL, NAME VARCHAR(64), BLOOD_GROUP CHAR(3), RESIDENCE VARCHAR(200), COMPANY VARCHAR(128), ADDRESS VARCHAR(200), BIRTHDATE DATE, SEX CHAR(1), JOB VARCHAR(128), SSN CHAR(11), MAIL VARCHAR(128), ID INTEGER NOT NULL AUTO_INCREMENT, T0 TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ```
- Whitelist the TPC.TEST table in your Debezium connector config JSON
``` "database.whitelist" : "TPC.TEST" ```
``` "table.whitelist" : "TPC.TEST" ```
- Enable the table for CDC on the database
- SQL for db2
@ -104,37 +104,28 @@ yum -y install git
### Run a self-contained TPC set up for DB2 with Docker compose
```
cd /home/<user name>
export DEBEZIUM_VERSION=1.2
export DEBEZIUM_DB2_VOLUME=$HOME/dockerdata
export DEBEZIUM_TPC_VOLUME=$HOME/tpcdata
mkdir $DEBEZIUM_DB2_VOLUME
mkdir $DEBEZIUM_TPC_VOLUME
chmod 777 $DEBEZIUM_TPC_VOLUME
cd $HOME
git clone https://github.com/debezium/debezium-examples
mkdir /home/<user name>/dockerdata
export DEBEZIUM_VERSION=1.1
export DEBEZIUM_DB2_VOLUME=/home/<user name>/dockerdata
git clone https://github.com/debezium/debezium-examples
mkdir tpcdata
chmod 777 /home/<user name>/tpcdata
export DEBEZIUM_TPC_VOLUME=/home/<user name>/tpcdata
git clone https://github.com/debezium/debezium
cd debezium/debezium-e2e-benchmark
# if you like do do it with an other db, adapt the docker-compose file to your prefered database and update the tpc-config.json file for appropriate database ( SQL)
docker-compose -f docker-compose-db2-tpc.yaml up --build
```
### Kafka change to enable topic deletion
```
docker exec -it debeziumtpc_kafka_1 /bin/bash
cd /kafka
echo 'delete.topic.enable=true' >> /kafka/config/server.properties
exit
docker stop debeziumtpc_kafka_1
docker start debeziumtpc_kafka_1
```
# Run test and plots
Once everything is set up, to actually run the tests:
```
docker exec -it debeziumtpc_tpc_1 /bin/bash
docker-compose -f docker-compose-db2-tpc.yaml exec tpc bash
python3 tpc-run-test.py
python3 runplots.py
```

View File

@ -14,6 +14,7 @@ services:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_DELETE_TOPIC_ENABLE=true
db2server:
image: debezium/db2-cdc:${DEBEZIUM_VERSION}
build:

View File

@ -14,6 +14,7 @@ services:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_DELETE_TOPIC_ENABLE=true
postgres:
image: debezium/example-postgres:${DEBEZIUM_VERSION}
ports:

View File

@ -20,7 +20,7 @@ import shlex
DEBEZIUM_TPC_VOLUME = '/home/tpc/tpcdata'
with open('config.json') as f:
with open('tpc-config.json') as f:
tpcconfig = json.load(f)
for x in range(len(tpcconfig['tpc']['commit.intervals'])):

View File

@ -54,12 +54,14 @@
"postgresql": {
"jdbcdriver": "org.postgresql.Driver",
"jar": "postgresql.jar",
"tpctable": "",
"tpctable": "CREATE TABLE INVENTORY.TEST ( USERNAME VARCHAR(32) NOT NULL, NAME VARCHAR(64), BLOOD_GROUP CHAR(3), RESIDENCE VARCHAR(200), COMPANY VARCHAR(128), ADDRESS VARCHAR(200), BIRTHDATE DATE, SEX CHAR(1), JOB VARCHAR(128), SSN CHAR(11), MAIL VARCHAR(128), ID INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, T0 TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP )",
"initsql": [],
"enablecdctablesql": []
"enablecdctablesql": [],
"table": "inventory.test",
"lowercase": "true"
}
},
"sql": {
"insert": "INSERT INTO TPC.TEST (USERNAME, NAME, BLOOD_GROUP ,RESIDENCE, COMPANY, ADDRESS, SEX) VALUES ( 'USERNAMEDAT', 'NAME', 'AB+', 'WELCOME TO EARTH', 'WORK TO WORK GMBH', 'HELP WORKING', 'M' )"
"insert": "INSERT INTO INVENTORY.TEST (USERNAME, NAME, BLOOD_GROUP ,RESIDENCE, COMPANY, ADDRESS, SEX) VALUES ( 'USERNAMEDAT', 'NAME', 'AB+', 'WELCOME TO EARTH', 'WORK TO WORK GMBH', 'HELP WORKING', 'M' )"
}
}

View File

@ -16,7 +16,9 @@ import threading
import jpype
tpchomedir = "/home/tpc"
tpchomedir = '/home/tpc'
table = ''
lowercase = 'false'
def initsql(conn, config, tpcconfig):
@ -66,9 +68,9 @@ def enablecdctablesql(conn, config, tpcconfig):
return 0
def topicexport(bootstrapserver, count, commitinterval):
def topicexport(bootstrapserver, topicname, count, commitinterval):
global tpchomedir
consumer = KafkaConsumer('db2server.TPC.TEST',
consumer = KafkaConsumer(topicname,
bootstrap_servers=bootstrapserver,
auto_offset_reset='smallest',
enable_auto_commit=True,
@ -85,8 +87,13 @@ def topicexport(bootstrapserver, count, commitinterval):
message.value))
d = json.loads(message.value)
file.write(str(message.timestamp) + "000;" + str(d['payload']['after']['T0']) +
";" + d['payload']['op'] + ";" + str(i) + ";" + str(d['payload']['after']['ID']) + "\n")
idf = 'ID'
t0f = 'T0'
if lowercase:
idf = 'id'
t0f = 't0'
file.write(str(message.timestamp) + "000;" + str(d['payload']['after'][t0f]) +
";" + d['payload']['op'] + ";" + str(i) + ";" + str(d['payload']['after'][idf]) + "\n")
i = i + 1
print(i)
print(count)
@ -95,9 +102,8 @@ def topicexport(bootstrapserver, count, commitinterval):
file.close()
def getjdbcconnection(config, tpcconfig):
def getjdbcconnection(config, tpcconfig, connectiontype):
global tpchomedir
connectiontype = config['config']['connector.class'].split('.')[3]
jdbctype = 'jdbc:' + connectiontype + '://'
if connectiontype == 'oracle':
jdbctype = 'jdbc:oracle:thin:@'
@ -112,23 +118,35 @@ def getjdbcconnection(config, tpcconfig):
def main(argv):
with open('../register.json') as f:
with open('register.json') as f:
config = json.load(f)
with open('tpc-config.json') as f:
tpcconfig = json.load(f)
if (len(argv) > 0):
bootstrapserver = argv[0]
print(config['config']['connector.class'])
print(config['name'])
config['name'] = 'tpc-connector'
print(config['name'])
config['config']['database.whitelist'] = 'TPC.TEST'
config['config']['database.history.kafka.topic'] = 'tpc-test'
databasetype = config['config']['connector.class']
print(databasetype.split('.')[3])
connectiontype = config['config']['connector.class'].split('.')[3]
print(databasetype)
conn = getjdbcconnection(config, tpcconfig)
print(connectiontype)
table = tpcconfig['jdbc'][connectiontype].get('table')
if table == None:
table = 'TPC.TEST'
config['config']['table.whitelist'] = table
lowercase = tpcconfig['jdbc'][connectiontype].get('lowercase')
if lowercase == None:
lowercase = False
conn = getjdbcconnection(config, tpcconfig, connectiontype)
initsql(conn, config, tpcconfig)
createTPCTable(conn, config, tpcconfig)
@ -150,41 +168,41 @@ def main(argv):
'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector', verify=False)
print(resp.content)
print(resp.status_code)
# retvalue = json.loads(resp.content)
# print(retvalue)
if (resp.status_code == 404):
print('tpc-connector not exists')
else:
print('tpc-connector deleted')
pass
dockerbootstrapserver = config['config']['database.history.kafka.bootstrap.servers']
bootstrapserver = config['config']['database.history.kafka.bootstrap.servers'].split(
",")
databaseservername = config['config']['database.server.name']
topicname = databaseservername + '.' + table
historybootstrapserver = config['config'].get('database.history.kafka.bootstrap.servers')
if historybootstrapserver != None:
bootstrapserver = historybootstrapserver.split(",")
# check integrated test ( all in one docker)
if dockerbootstrapserver == 'kafka:9092':
if bootstrapserver == 'kafka:9092':
print(bootstrapserver)
kafkaadmin = KafkaAdminClient(bootstrap_servers=bootstrapserver)
try:
kafkaadmin.delete_topics(
[config['config']['database.server.name'] + '.TPC.TEST'], 30)
[topicname], 30)
except:
print(config['config']['database.server.name'] +
'.TPC.TEST TOPIC not exists')
print(topicname + ' TOPIC not exists')
else:
print(config['config']['database.server.name'] +
'.TPC.TEST TOPIC deleted')
try:
kafkaadmin.delete_topics(
[config['config']['database.history.kafka.topic']], 30)
except:
print(config['config']['database.history.kafka.topic'] +
' TOPIC not exists')
else:
print(config['config']
['database.history.kafka.topic'] + ' TOPIC deleted')
print(topicname + ' TOPIC deleted')
if historybootstrapserver != None:
try:
kafkaadmin.delete_topics(
[config['config']['database.history.kafka.topic']], 30)
except:
print(config['config']['database.history.kafka.topic'] +
' TOPIC not exists')
else:
print(config['config']
['database.history.kafka.topic'] + ' TOPIC deleted')
# start tpc connector
print('start tpc connector')
@ -214,11 +232,10 @@ def main(argv):
if ((y % (tpcconfig['tpc']['commit.intervals'][x])) == (tpcconfig['tpc']['commit.intervals'][x] - 1)):
conn.commit()
conn.commit()
topicexport(bootstrapserver, int(
topicexport(bootstrapserver, topicname, int(
tpcconfig['tpc']['count']), tpcconfig['tpc']['commit.intervals'][x])
kafkaadmin.delete_topics(
[config['config']['database.server.name'] + '.TPC.TEST'], 30)
kafkaadmin.delete_topics([topicname], 30)
print('Wait 30 second for TOPIC clean up')
time.sleep(30)

View File

@ -42,14 +42,14 @@ ymax=max(y)
plt.subplot(3, 1, 1)
plt.plot(x,y)
plt.xlabel('millisecond')
plt.ylabel('delta insert db2 to topic')
plt.ylabel('delta insert database to topic')
plt.title('TPC Graph\nInsert data')
plt.legend()
axes = plt.gca()
axes.set_xlim([xmin,xmax])
axes.set_ylim([ymin,ymax])
print('delta insert db2 to topic')
print('delta insert database to topic')
print('min :',min(y))
print('max :',max(y))
print('average :',sum(y)/len(y))
@ -78,7 +78,7 @@ axes = plt.gca()
axes.set_xlim([xmin,xmax])
axes.set_ylim([ymin,40])
print('db2 in ')
print('database in ')
print('min :',min(y))
print('max :',max(y))
print('average :',sum(y)/len(y))