python操作Kafka+Elasticsearch+InfluDB+MySQL
以下是测试demo,如有错误还请指正,运行错误还望自行解决,谢谢!python依赖:sudo apt-get install libmysqlclient-devsudo pip install arrowsudo pip install pykafkasudo pip install mysqlsudo pip install python-daemonsudo pip insta...
·
以下是测试demo,如有错误还请指正,运行错误还望自行解决,谢谢!
python依赖:
sudo apt-get install libmysqlclient-dev
sudo pip install arrow
sudo pip install pykafka
sudo pip install mysql
sudo pip install python-daemon
sudo pip install mysql_connector_python
sudo pip install influxdb
sudo apt-get install libmysqlclient-dev
KafkaProcess.py
#coding=utf-8
import time
import json
import threading
from pykafka import KafkaClient
options = {
'kafka': {
'host': '127.0.0.1:9092',
'topic_name': 'test',
'consumer_group': 'consumer01',
'kafka_zookeeper': '127.0.0.1:2181',
'timeout': 30*1000, #ms
},
}
class KafkaProcess:
def __init__(self, options):
self.kafka_conf = options.get('kafka')
def kafka_consumer_connect(self):
client = KafkaClient(hosts=self.kafka_conf.get('host'), socket_timeout_ms=self.kafka_conf.get('timeout'), offsets_channel_socket_timeout_ms=self.kafka_conf.get('timeout'))
topic = client.topics[self.kafka_conf.get('topic_name')]
consumer = topic.get_balanced_consumer(consumer_group=self.kafka_conf.get('consumer_group'), auto_commit_enable=True, zookeeper_connect=self.kafka_conf.get('kafka_zookeeper'), zookeeper_connection_timeout_ms=self.kafka_conf.get('timeout'))
return consumer
def start(self):
consumer = self.kafka_consumer_connect()
iterc = consumer.__iter__()
while True:
a = None
try:
msgx = iterc.next()
a = json.loads(msgx.value)
print a
except Exception as e:
logging.warning('Exception Kafka Start: {0}'.format(e))
time.sleep(0.5)
if __name__ == '__main__':
Kafka = KafkaProcess(options)
Kafka.start()
ESDBProcess.py
#coding=utf-8
import logging
import threading
from Queue import Queue
from elasticsearch import helpers
from elasticsearch import Elasticsearch
options = {
'debug': True,
'es': {
'es_host': '127.0.0.1:9200',
'index_name': 'test01',
},
}
class ESDBProcess(threading.Thread):
def __init__(self, options, es_queue):
threading.Thread.__init__(self)
self.es_conf = options.get('es')
self.data_list = []
self.es_queue = es_queue
self.es_conn = Elasticsearch(self.es_conf.get('es_host'))
def esInsert(self, data):
try:
helpers.bulk(self.es_conn, data)
except Exception as e:
logging.warning('Exception esInsert: {0}'.format(e))
def run(self):
while True:
if not self.es_queue.empty():
data = self.es_queue.get()
if 'test' in data:
self.data_list.extend(data.values()[0])
if len(self.data_list) > 100:
self.esInsert(self.data_list)
self.data_list = []
if __name__ == '__main__':
es_queue = Queue()
ES = ESDBProcess(options, es_queue)
ES.start()
i = 0
while True:
i += 1
_list = []
_dict = {'1':'Hello', '2':'World'}
_time = arrow.get('2019-08-15 10:32:30')
sourceD = dict({"time":_time.format('YYYY-MM-DDTHH:mm:ss.000+0800')}, **_dict)
action = {"_index":options.get('es').get('index_name'), "_type":'Test', "_id":i}
action['_source'] = sourceD
_list.append(action)
es_queue.put({'test':_list})
time.sleep(1)
InfluxDBProcess.py
#coding=utf-8
import logging
import time
import arrow
import threading
import pandas as pd
from influxdb import InfluxDBClient
options = {
'influxdb': {
'server': '127.0.0.1',
'port': 8086,
'user': 'admin',
'password': 'admin',
'database': 'db_collector'
},
}
class InfluxDBProcess(threading.Thread):
def __init__(self, options, influx_queue, recv_queue):
threading.Thread.__init__(self)
self.dbconf = options.get('influxdb')
self.influx_queue = influx_queue
self.recv_queue = recv_queue
self.body_list = []
self.client = InfluxDBClient(self.dbconf.get('server'), self.dbconf.get('port'), self.dbconf.get('user'), self.dbconf.get('password'), self.dbconf.get('database'))
def influxInsert(self, client):
if client and self.body_list:
try:
logging.info('influxInsert num: %d' % len(self.body_list))
client.write_points(self.body_list)
self.body_list = []
except Exception as e:
logging.error('Exception influxInsert: {0}'.format(e))
finally:
self.body_list = []
def strTimeSubSecond(self, time_str, step):
try:
_time_t = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.localtime(int(time.mktime(time.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ"))) - step))
return _time_t
except Exception as e:
pass
def influxSelect(self, reques_interval):
if self.client:
try:
current_time = None
res = self.client.query('select last(id) as id from Test')
points = res.get_points()
for item in points:
current_time = item['time']
sql = ('select id, name, info, time from Test where time >= \'%s\' and time <= \'%s\'' % (self.strTimeSubSecond(current_time, reques_interval), current_time))
res = self.client.query(sql)
logging.info('InfluxDB Select last_time=(%s) current_time=(%s)' % (self.last_time, current_time))
self.last_time = current_time
pd_temp = pd.DataFrame(res.get_points())
pd_temp = pd_temp.dropna(subset=['name', 'info'], how='all') #去除两个字段同为NaN的数据
pd_temp = pd_temp.fillna(0) #将所有NaN替换成0
self.recv_queue.put({'Test':pd_temp, 'time':current_time})
except Exception as e:
logging.error('Exception influxSelect: {0}'.format(e))
def run(self):
while True:
if not self.influx_queue.empty():
data = self.influx_queue.get()
self.body_list.append(data)
if len(self.body_list) > 10:
self.influxInsert(self.client)
def form_point(self, time_str, amend=int(-28800)):
try:
_time = arrow.get(time_str, 'YYYY-MM-DD HH:mm:ss')
_stamp = int(str(_time.timestamp + amend) + ('0' * 9))
return _stamp
except Exception as e:
pass
if __name__ == '__main__':
influx_queue = Queue()
recv_queue = Queue()
Influx = InfluxDBProcess(options, influx_queue, recv_queue)
Influx.start()
i = 0
request_time = 0
reques_interval = 30
while True:
now_time = time.time()
if now_time - request_time > reques_interval:
request_time = now_time
Influx.influxSelect(reques_interval)
if not self.recv_queue.empty():
data = self.recv_queue.get()
print data
i += 1
_time = '2019-08-15 10:32:29'
_dict = {'id':0, 'name':'hanbo', 'info':'txt', 'time':'2019-08-15 10:32:29'}
_raw.update(time=form_point(_time), tags={'id': i}, fields=_dict, measurement='Test')
influx_queue.put(_raw)
time.sleep(1)
MySQLProcess.py
#coding=utf-8
import time
import logging
import threading
import mysql.connector
options = {
'mysqldb': {
'server': '127.0.0.1',
'user': 'root',
'password': 'root',
'database': 'db_test'
}
}
class DatabaseCursor:
def __init__(self,conn):
self.cursor = conn.cursor(dictionary = True)
def __enter__(self):
return self.cursor
def __exit__(self,exc_type, exc_value, exc_tb):
self.cursor.close()
self.cursor = None
class MySQLProcess(threading.Thread):
def __init__(self, options, mysql_queue, recv_queue):
threading.Thread.__init__(self)
self.dbconf = options.get('mysqldb')
self.recv_queue = recv_queue
self.mysql_queue = mysql_queue
self.conn = mysql.connector.connect(host=self.dbconf.get('server'),user=self.dbconf.get('user'),password=self.dbconf.get('password'),database=self.dbconf.get('database'))
def updateInfoData(self, conn):
try:
with DatabaseCursor(conn) as cursor:
commit_num = 0
while not self.mysql_queue.empty():
data = self.mysql_queue.get()
try:
sql = ('insert into tb_test set id=%d, name=\'%s\', info=\'%s\', time=\'%s\''
'on duplicate key '
'update name=\'%s\', info=\'%s\', time=\'%s\'' %
(data['id'], data['name'], data['info'], data['time'],
data['name'], data['info'], data['time'])
) #如果不存在就插入,如果存在就更新
cursor.execute(sql)
except Exception as e:
logging.warning('Exception Update|Insert SQL: {0}'.format(e))
conn.commit()
except Exception as e:
logging.error('Exception updateInfoData: {0}'.format(e))
def selectData(self, conn):
try:
with DatabaseCursor(conn) as cursor:
_list = []
cursor.execute('select name from tb_test')
for row in cursor:
_list.append(row['name'])
self.recv_queue.put({'name':_list})
except Exception as e:
logging.error('Exception selectData: {0}'.format(e))
def run(self):
while True:
self.selectData(self.conn)
time.sleep(10)
if __name__ == '__main__':
mysql_queue = Queue()
recv_queue = Queue()
MySQL = MySQLProcess(options, mysql_queue, recv_queue)
MySQL.start()
i = 0
while True:
if not self.recv_queue.empty():
data = self.recv_queue.get()
print data
i += 1
_dict = {'id':0, 'name':'hanbo', 'info':'txt', 'time':'2019-08-15 10:32:29'}
_dict['id'] = i
mysql_queue.put(_dict)
time.sleep(1)
更多推荐
所有评论(0)