245 lines
8.3 KiB
Python
Executable File
245 lines
8.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from kafka import KafkaConsumer
|
|
from kafka import KafkaAdminClient
|
|
import json
|
|
from pprint import pprint
|
|
import jaydebeapi
|
|
import sys
|
|
import time
|
|
import random
|
|
import string
|
|
from pprint import pprint
|
|
import requests
|
|
import datetime
|
|
import threading
|
|
import jpype
|
|
|
|
|
|
tpchomedir = '/home/tpc'
|
|
table = ''
|
|
lowercase = 'false'
|
|
|
|
|
|
def initsql(conn, config, tpcconfig):
|
|
curs = conn.cursor()
|
|
for sql in tpcconfig['jdbc'][config['config']['connector.class'].split('.')[3]]['initsql']:
|
|
if sql.startswith('python.time.sleep'):
|
|
print('Wait for ' + sql[17:] + ' second')
|
|
time.sleep(int(sql[17:]))
|
|
else:
|
|
try:
|
|
curs.execute(sql)
|
|
conn.commit()
|
|
print(sql)
|
|
except:
|
|
print(sql + ' done with exception !!')
|
|
return 0
|
|
|
|
|
|
def createTPCTable(conn, config, tpcconfig):
|
|
curs = conn.cursor()
|
|
try:
|
|
print(tpcconfig['jdbc'][config['config']
|
|
['connector.class'].split('.')[3]]['tpctable'])
|
|
curs.execute(tpcconfig['jdbc'][config['config']
|
|
['connector.class'].split('.')[3]]['tpctable'])
|
|
conn.commit()
|
|
print('table created')
|
|
return 0
|
|
except:
|
|
print('table create error')
|
|
return 1
|
|
|
|
|
|
def enablecdctablesql(conn, config, tpcconfig):
|
|
curs = conn.cursor()
|
|
for sql in tpcconfig['jdbc'][config['config']['connector.class'].split('.')[3]]['enablecdctablesql']:
|
|
if sql.startswith('python.time.sleep'):
|
|
print('Wait for ' + sql[17:] + ' second')
|
|
time.sleep(int(sql[17:]))
|
|
else:
|
|
try:
|
|
curs.execute(sql)
|
|
conn.commit()
|
|
print(sql)
|
|
except:
|
|
print(sql + ' done with exception !!')
|
|
return 0
|
|
|
|
|
|
def topicexport(bootstrapserver, topicname, count, commitinterval):
|
|
global tpchomedir
|
|
consumer = KafkaConsumer(topicname,
|
|
bootstrap_servers=bootstrapserver,
|
|
auto_offset_reset='smallest',
|
|
enable_auto_commit=True,
|
|
)
|
|
i = 0
|
|
file = open(tpchomedir + "/tpcdata/" + "tpc_" + str(count) +
|
|
"_" + str(commitinterval) + ".csv", "w")
|
|
print('read from TOPIC')
|
|
for message in consumer:
|
|
# message value and key are raw bytes -- decode if necessary!
|
|
# e.g., for unicode: `message.value.decode('utf-8')`
|
|
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
|
|
message.offset, message.key,
|
|
message.value))
|
|
|
|
d = json.loads(message.value)
|
|
idf = 'ID'
|
|
t0f = 'T0'
|
|
if lowercase:
|
|
idf = 'id'
|
|
t0f = 't0'
|
|
file.write(str(message.timestamp) + "000;" + str(d['payload']['after'][t0f]) +
|
|
";" + d['payload']['op'] + ";" + str(i) + ";" + str(d['payload']['after'][idf]) + "\n")
|
|
i = i + 1
|
|
print(i)
|
|
print(count)
|
|
if i >= count:
|
|
break
|
|
file.close()
|
|
|
|
|
|
def getjdbcconnection(config, tpcconfig, connectiontype):
|
|
global tpchomedir
|
|
jdbctype = 'jdbc:' + connectiontype + '://'
|
|
if connectiontype == 'oracle':
|
|
jdbctype = 'jdbc:oracle:thin:@'
|
|
conn = jaydebeapi.connect(tpcconfig['jdbc'][connectiontype]['jdbcdriver'], jdbctype +
|
|
config['config']['database.hostname'] + ':' +
|
|
config['config']['database.port'] + '/' +
|
|
config['config']['database.dbname'],
|
|
[config['config']['database.user'],
|
|
config['config']['database.password']],
|
|
tpchomedir + '/jdbcdriver/' + tpcconfig['jdbc'][connectiontype]['jar'])
|
|
return conn
|
|
|
|
|
|
def main(argv):
|
|
with open('register.json') as f:
|
|
config = json.load(f)
|
|
with open('tpc-config.json') as f:
|
|
tpcconfig = json.load(f)
|
|
|
|
if (len(argv) > 0):
|
|
bootstrapserver = argv[0]
|
|
print(config['config']['connector.class'])
|
|
print(config['name'])
|
|
config['name'] = 'tpc-connector'
|
|
print(config['name'])
|
|
|
|
config['config']['schema.history.internal.kafka.topic'] = 'tpc-test'
|
|
|
|
databasetype = config['config']['connector.class']
|
|
connectiontype = config['config']['connector.class'].split('.')[3]
|
|
print(databasetype)
|
|
print(connectiontype)
|
|
|
|
table = tpcconfig['jdbc'][connectiontype].get('table')
|
|
if table == None:
|
|
table = 'TPC.TEST'
|
|
config['config']['table.include.list'] = table
|
|
|
|
lowercase = tpcconfig['jdbc'][connectiontype].get('lowercase')
|
|
if lowercase == None:
|
|
lowercase = False
|
|
|
|
conn = getjdbcconnection(config, tpcconfig, connectiontype)
|
|
|
|
initsql(conn, config, tpcconfig)
|
|
createTPCTable(conn, config, tpcconfig)
|
|
enablecdctablesql(conn, config, tpcconfig)
|
|
|
|
print('============')
|
|
|
|
print('get status tpc connector')
|
|
resp = requests.get(
|
|
'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector/status', verify=False)
|
|
print(resp.content)
|
|
print(resp.status_code)
|
|
retvalue = json.loads(resp.content)
|
|
print(retvalue)
|
|
|
|
print('============')
|
|
|
|
resp = requests.delete(
|
|
'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector', verify=False)
|
|
print(resp.content)
|
|
print(resp.status_code)
|
|
if (resp.status_code == 404):
|
|
print('tpc-connector not exists')
|
|
else:
|
|
print('tpc-connector deleted')
|
|
pass
|
|
|
|
databaseservername = config['config']['topic.prefix']
|
|
topicname = databaseservername + '.' + table
|
|
historybootstrapserver = config['config'].get('schema.history.internal.kafka.bootstrap.servers')
|
|
if historybootstrapserver != None:
|
|
bootstrapserver = historybootstrapserver.split(",")
|
|
|
|
# check integrated test ( all in one docker)
|
|
if bootstrapserver == 'kafka:9092':
|
|
|
|
print(bootstrapserver)
|
|
kafkaadmin = KafkaAdminClient(bootstrap_servers=bootstrapserver)
|
|
|
|
try:
|
|
kafkaadmin.delete_topics(
|
|
[topicname], 30)
|
|
except:
|
|
print(topicname + ' TOPIC not exists')
|
|
else:
|
|
print(topicname + ' TOPIC deleted')
|
|
if historybootstrapserver != None:
|
|
try:
|
|
kafkaadmin.delete_topics(
|
|
[config['config']['schema.history.internal.kafka.topic']], 30)
|
|
except:
|
|
print(config['config']['schema.history.internal.kafka.topic'] +
|
|
' TOPIC not exists')
|
|
else:
|
|
print(config['config']
|
|
['schema.history.internal.kafka.topic'] + ' TOPIC deleted')
|
|
|
|
# start tpc connector
|
|
print('start tpc connector')
|
|
resp = requests.post(
|
|
'http://' + tpcconfig['debezium.connect.server'] + '/connectors', headers={'content-type': 'application/json'}, data=json.dumps(config), verify=False)
|
|
print(resp.content)
|
|
print(resp.status_code)
|
|
retvalue = json.loads(resp.content)
|
|
print(retvalue)
|
|
|
|
while resp.status_code > 200:
|
|
print('get status tpc connector')
|
|
resp = requests.get(
|
|
'http://' + tpcconfig['debezium.connect.server'] + '/connectors/tpc-connector/status', verify=False)
|
|
print(resp.content)
|
|
print(resp.status_code)
|
|
retvalue = json.loads(resp.content)
|
|
print(retvalue)
|
|
time.sleep(1)
|
|
|
|
conn.jconn.setAutoCommit(False)
|
|
for x in range(len(tpcconfig['tpc']['commit.intervals'])):
|
|
print(tpcconfig['tpc']['commit.intervals'][x])
|
|
curs = conn.cursor()
|
|
for y in range(int(tpcconfig['tpc']['count'])):
|
|
curs.execute(tpcconfig['sql']['insert'])
|
|
if ((y % (tpcconfig['tpc']['commit.intervals'][x])) == (tpcconfig['tpc']['commit.intervals'][x] - 1)):
|
|
conn.commit()
|
|
conn.commit()
|
|
topicexport(bootstrapserver, topicname, int(
|
|
tpcconfig['tpc']['count']), tpcconfig['tpc']['commit.intervals'][x])
|
|
|
|
kafkaadmin.delete_topics([topicname], 30)
|
|
print('Wait 30 second for TOPIC clean up')
|
|
time.sleep(30)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv[1:])
|