tet123/debezium-e2e-benchmark/py/tpc-run-test.py

223 lines
7.8 KiB
Python
Raw Normal View History

2020-03-25 09:21:57 +01:00
#!/usr/bin/env python3
from kafka import KafkaConsumer
from kafka import KafkaAdminClient
import json
from pprint import pprint
import jaydebeapi
import sys
import time
import random
import string
from pprint import pprint
import requests
import datetime
import threading
import jpype
def initsql(conn, config, tpcconfig):
curs = conn.cursor()
for sql in tpcconfig['jdbc'][config['config']['connector.class'].split('.')[3]]['initsql']:
if sql.startswith('python.time.sleep'):
print('Wait for ' + sql[17:] + ' second')
time.sleep(int(sql[17:]))
else:
try:
curs.execute(sql)
conn.commit()
print(sql)
except:
print(sql + ' done with exception !!')
return 0
def createTPCTable(conn, config, tpcconfig):
curs = conn.cursor()
try:
print(tpcconfig['jdbc'][config['config']
['connector.class'].split('.')[3]]['tpctable'])
curs.execute(tpcconfig['jdbc'][config['config']
['connector.class'].split('.')[3]]['tpctable'])
conn.commit()
print('table created')
return 0
except:
print('table create error')
return 1
def enablecdctablesql(conn, config, tpcconfig):
curs = conn.cursor()
for sql in tpcconfig['jdbc'][config['config']['connector.class'].split('.')[3]]['enablecdctablesql']:
if sql.startswith('python.time.sleep'):
print('Wait for ' + sql[17:] + ' second')
time.sleep(int(sql[17:]))
else:
try:
curs.execute(sql)
conn.commit()
print(sql)
except:
print(sql + ' done with exception !!')
return 0
def topicexport(bootstrapserver, count, commitinterval):
consumer = KafkaConsumer('db2server.TPC.TEST',
bootstrap_servers=bootstrapserver,
auto_offset_reset='smallest',
enable_auto_commit=True,
)
i = 0
file = open("/home/tpc/tpcdata/" + "tpc_" + str(count) +
"_" + str(commitinterval) + ".csv", "w")
print('read from TOPIC')
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
d = json.loads(message.value)
file.write(str(message.timestamp) + "000;" + str(d['payload']['after']['T0']) +
";" + d['payload']['op'] + ";" + str(i) + ";" + str(d['payload']['after']['ID']) + "\n")
i = i + 1
print(i)
print(count)
if i >= count:
break
file.close()
def getjdbcconnection(config, tpcconfig):
connectiontype = config['config']['connector.class'].split('.')[3]
jdbctype = 'jdbc:' + connectiontype + '://'
if connectiontype == 'oracle':
jdbctype = 'jdbc:oracle:thin:@'
conn = jaydebeapi.connect(tpcconfig['jdbc'][connectiontype]['jdbcdriver'], jdbctype +
config['config']['database.hostname'] + ':' +
config['config']['database.port'] + '/' +
config['config']['database.dbname'],
[config['config']['database.user'],
config['config']['database.password']],
'/home/tpc/jdbcdriver/' + tpcconfig['jdbc'][connectiontype]['jar'])
return conn
def main(argv):
with open('../register.json') as f:
config = json.load(f)
with open('tpc-config.json') as f:
tpcconfig = json.load(f)
print(config['config']['connector.class'])
print(config['name'])
config['name'] = 'tpc-connector'
print(config['name'])
config['config']['database.whitelist'] = 'TPC.TEST'
config['config']['database.history.kafka.topic'] = 'tpc-test'
databasetype = config['config']['connector.class']
print(databasetype.split('.')[3])
print(databasetype)
conn = getjdbcconnection(config, tpcconfig)
initsql(conn, config, tpcconfig)
createTPCTable(conn, config, tpcconfig)
enablecdctablesql(conn, config, tpcconfig)
print('============')
print('get status tpc connector')
resp = requests.get(
'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector/status', verify=False)
print(resp.content)
print(resp.status_code)
retvalue = json.loads(resp.content)
print(retvalue)
print('============')
resp = requests.delete(
'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector', verify=False)
print(resp.content)
print(resp.status_code)
# retvalue = json.loads(resp.content)
# print(retvalue)
if (resp.status_code == 404):
print('tpc-connector not exists')
else:
print('tpc-connector deleted')
pass
dockerbootstrapserver = config['config']['database.history.kafka.bootstrap.servers']
2020-04-24 07:19:04 +02:00
bootstrapserver = config['config']['database.history.kafka.bootstrap.servers'].split(
",")
2020-03-25 09:21:57 +01:00
# check integrated test ( all in one docker)
2020-04-24 07:19:04 +02:00
if dockerbootstrapserver == 'kafka:9092':
2020-03-25 09:21:57 +01:00
print(bootstrapserver)
kafkaadmin = KafkaAdminClient(bootstrap_servers=bootstrapserver)
try:
kafkaadmin.delete_topics(
[config['config']['database.server.name'] + '.TPC.TEST'], 30)
except:
print(config['config']['database.server.name'] +
'.TPC.TEST TOPIC not exists')
else:
print(config['config']['database.server.name'] +
'.TPC.TEST TOPIC deleted')
try:
kafkaadmin.delete_topics(
[config['config']['database.history.kafka.topic']], 30)
except:
print(config['config']['database.history.kafka.topic'] +
' TOPIC not exists')
else:
print(config['config']
['database.history.kafka.topic'] + ' TOPIC deleted')
# start tpc connector
print('start tpc connector')
resp = requests.post(
'http://' + tpcconfig['debezium.connect.server'] + '/connectors', headers={'content-type': 'application/json'}, data=json.dumps(config), verify=False)
print(resp.content)
print(resp.status_code)
retvalue = json.loads(resp.content)
print(retvalue)
while resp.status_code > 200:
print('get status tpc connector')
resp = requests.get(
'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector/status', verify=False)
print(resp.content)
print(resp.status_code)
retvalue = json.loads(resp.content)
print(retvalue)
time.sleep(1)
conn.jconn.setAutoCommit(False)
for x in range(len(tpcconfig['tpc']['commit.intervals'])):
print(tpcconfig['tpc']['commit.intervals'][x])
curs = conn.cursor()
for y in range(int(tpcconfig['tpc']['count'])):
curs.execute(tpcconfig['sql']['insert'])
if ((y % (tpcconfig['tpc']['commit.intervals'][x])) == (tpcconfig['tpc']['commit.intervals'][x] - 1)):
conn.commit()
conn.commit()
topicexport(bootstrapserver, int(
tpcconfig['tpc']['count']), tpcconfig['tpc']['commit.intervals'][x])
kafkaadmin.delete_topics(
[config['config']['database.server.name'] + '.TPC.TEST'], 30)
print('Wait 30 second for TOPIC clean up')
time.sleep(30)
if __name__ == "__main__":
main(sys.argv[1:])