以下是测试demo,如有错误还请指正,运行错误还望自行解决,谢谢!

python依赖:

sudo apt-get install libmysqlclient-dev
sudo pip install arrow
sudo pip install pykafka
sudo pip install mysql
sudo pip install python-daemon
sudo pip install mysql_connector_python
sudo pip install influxdb
sudo apt-get install libmysqlclient-dev

KafkaProcess.py

#coding=utf-8

import time
import json
import threading
from pykafka import KafkaClient

options = {
    'kafka': {
        'host': '127.0.0.1:9092',
		'topic_name': 'test',
		'consumer_group': 'consumer01',
		'kafka_zookeeper': '127.0.0.1:2181',
		'timeout': 30*1000, #ms
    },
}

class KafkaProcess:
    def __init__(self, options):
        self.kafka_conf = options.get('kafka')
    
    def kafka_consumer_connect(self):
        client = KafkaClient(hosts=self.kafka_conf.get('host'), socket_timeout_ms=self.kafka_conf.get('timeout'), offsets_channel_socket_timeout_ms=self.kafka_conf.get('timeout'))
        topic = client.topics[self.kafka_conf.get('topic_name')]
        consumer = topic.get_balanced_consumer(consumer_group=self.kafka_conf.get('consumer_group'), auto_commit_enable=True, zookeeper_connect=self.kafka_conf.get('kafka_zookeeper'), zookeeper_connection_timeout_ms=self.kafka_conf.get('timeout'))
        return consumer

    def start(self):
        consumer = self.kafka_consumer_connect()
        iterc = consumer.__iter__()
        while True:
            a = None
            try:
                msgx = iterc.next()
                a = json.loads(msgx.value)
                print a
            except Exception as e:
                logging.warning('Exception Kafka Start: {0}'.format(e))
            time.sleep(0.5)
			
if __name__ == '__main__':
	Kafka = KafkaProcess(options)
	Kafka.start()

ESDBProcess.py

#coding=utf-8

import logging
import threading
from Queue import Queue
from elasticsearch import helpers
from elasticsearch import Elasticsearch

options = {
    'debug': True,
    'es': {
        'es_host': '127.0.0.1:9200',
        'index_name': 'test01',
    },
}

class ESDBProcess(threading.Thread):
    def __init__(self, options, es_queue):
        threading.Thread.__init__(self)
        self.es_conf = options.get('es')
        self.data_list = []
        self.es_queue = es_queue
        self.es_conn = Elasticsearch(self.es_conf.get('es_host'))

    def esInsert(self, data):
        try:
            helpers.bulk(self.es_conn, data)
        except Exception as e:
            logging.warning('Exception esInsert: {0}'.format(e))

    def run(self):
        while True:
            if not self.es_queue.empty():
                data = self.es_queue.get()
				if 'test' in data:
					self.data_list.extend(data.values()[0])
					if len(self.data_list) > 100:
						self.esInsert(self.data_list)
						self.data_list = []

if __name__ == '__main__':
	es_queue = Queue()
	ES = ESDBProcess(options, es_queue)
	ES.start()
	i = 0
	while True:
		i += 1
		_list = []
		_dict = {'1':'Hello', '2':'World'}
		_time = arrow.get('2019-08-15 10:32:30')
		sourceD = dict({"time":_time.format('YYYY-MM-DDTHH:mm:ss.000+0800')}, **_dict)
        action = {"_index":options.get('es').get('index_name'), "_type":'Test', "_id":i}
        action['_source'] = sourceD
        _list.append(action)
		es_queue.put({'test':_list})
		time.sleep(1)

InfluxDBProcess.py

#coding=utf-8

import logging
import time
import arrow
import threading
import pandas as pd
from influxdb import InfluxDBClient

options = {
    'influxdb': {
        'server': '127.0.0.1',
        'port': 8086,
        'user': 'admin',
        'password': 'admin',
        'database': 'db_collector'
    },
}

class InfluxDBProcess(threading.Thread):
    def __init__(self, options, influx_queue, recv_queue):
        threading.Thread.__init__(self)
        self.dbconf = options.get('influxdb')
        self.influx_queue = influx_queue
		self.recv_queue = recv_queue
        self.body_list = []
        self.client = InfluxDBClient(self.dbconf.get('server'), self.dbconf.get('port'), self.dbconf.get('user'), self.dbconf.get('password'), self.dbconf.get('database'))

    def influxInsert(self, client):
        if client and self.body_list:
            try:
                logging.info('influxInsert num: %d' % len(self.body_list))
                client.write_points(self.body_list)
                self.body_list = []
            except Exception as e:
                logging.error('Exception influxInsert: {0}'.format(e))
            finally:
                self.body_list = []
				
	def strTimeSubSecond(self, time_str, step):
        try:
            _time_t = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.localtime(int(time.mktime(time.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ"))) - step))
            return _time_t
        except Exception as e:
            pass
			
	def influxSelect(self, reques_interval):
        if self.client:
            try:
                current_time = None
                res = self.client.query('select last(id) as id from Test')
                points = res.get_points()
                for item in points:
                    current_time = item['time']

                sql = ('select id, name, info, time from Test where time >= \'%s\' and time <= \'%s\'' % (self.strTimeSubSecond(current_time, reques_interval), current_time))
                res = self.client.query(sql)
                logging.info('InfluxDB Select last_time=(%s) current_time=(%s)' % (self.last_time, current_time))
                self.last_time = current_time

                pd_temp = pd.DataFrame(res.get_points())
                pd_temp = pd_temp.dropna(subset=['name', 'info'], how='all') #去除两个字段同为NaN的数据
                pd_temp = pd_temp.fillna(0) #将所有NaN替换成0
                self.recv_queue.put({'Test':pd_temp, 'time':current_time})
            except Exception as e:
                logging.error('Exception influxSelect: {0}'.format(e))
				
    def run(self):
        while True:
            if not self.influx_queue.empty():
                data = self.influx_queue.get()
                self.body_list.append(data)
                if len(self.body_list) > 10:
                    self.influxInsert(self.client)

def form_point(self, time_str, amend=int(-28800)):
	try:
		_time = arrow.get(time_str, 'YYYY-MM-DD HH:mm:ss')
		_stamp = int(str(_time.timestamp + amend) + ('0' * 9))
		return _stamp
	except Exception as e:
		pass
			
if __name__ == '__main__':
	influx_queue = Queue()
	recv_queue = Queue()
	Influx = InfluxDBProcess(options, influx_queue, recv_queue)
	Influx.start()
	i = 0
	request_time = 0
	reques_interval = 30
	while True:
		now_time = time.time()
		if now_time - request_time  > reques_interval:
			request_time = now_time
			Influx.influxSelect(reques_interval)
		if not self.recv_queue.empty():
            data = self.recv_queue.get()
			print data
	
		i += 1
		_time = '2019-08-15 10:32:29'
		_dict = {'id':0, 'name':'hanbo', 'info':'txt', 'time':'2019-08-15 10:32:29'}
		_raw.update(time=form_point(_time), tags={'id': i}, fields=_dict, measurement='Test')
        influx_queue.put(_raw)
		time.sleep(1)

MySQLProcess.py

#coding=utf-8

import time
import logging
import threading
import mysql.connector

options = {
    'mysqldb': {
        'server': '127.0.0.1',
        'user': 'root',
        'password': 'root',
        'database': 'db_test'
    }
}

class DatabaseCursor:
    def __init__(self,conn):
        self.cursor = conn.cursor(dictionary = True)

    def __enter__(self):
        return self.cursor

    def __exit__(self,exc_type, exc_value, exc_tb):
        self.cursor.close()
        self.cursor = None

class MySQLProcess(threading.Thread):
    def __init__(self, options, mysql_queue, recv_queue):
        threading.Thread.__init__(self)
        self.dbconf = options.get('mysqldb')
        self.recv_queue = recv_queue
		self.mysql_queue = mysql_queue
        self.conn = mysql.connector.connect(host=self.dbconf.get('server'),user=self.dbconf.get('user'),password=self.dbconf.get('password'),database=self.dbconf.get('database'))
		
	def updateInfoData(self, conn):
        try:
            with DatabaseCursor(conn) as cursor:
                commit_num = 0
                while not self.mysql_queue.empty():
                    data = self.mysql_queue.get()

                    try:
						sql = ('insert into tb_test set id=%d, name=\'%s\', info=\'%s\', time=\'%s\''
						'on duplicate key '
						'update name=\'%s\', info=\'%s\', time=\'%s\'' %
						(data['id'], data['name'], data['info'], data['time'],
						data['name'], data['info'], data['time'])
						) #如果不存在就插入,如果存在就更新
                        cursor.execute(sql)
                    except Exception as e:
                        logging.warning('Exception Update|Insert SQL: {0}'.format(e))
                conn.commit()
        except Exception as e:
            logging.error('Exception updateInfoData: {0}'.format(e))
			
    def selectData(self, conn):
        try:
            with DatabaseCursor(conn) as cursor:
                _list = []
                cursor.execute('select name from tb_test')
                for row in cursor:
                    _list.append(row['name'])
                self.recv_queue.put({'name':_list})
        except Exception as e:
            logging.error('Exception selectData: {0}'.format(e))

    def run(self):
        while True:
            self.selectData(self.conn)
            time.sleep(10)

if __name__ == '__main__':
	mysql_queue = Queue()
	recv_queue = Queue()
	MySQL = MySQLProcess(options, mysql_queue, recv_queue)
	MySQL.start()
	i = 0
	while True:
		if not self.recv_queue.empty():
			data = self.recv_queue.get()
			print data
			
		i += 1
		_dict = {'id':0, 'name':'hanbo', 'info':'txt', 'time':'2019-08-15 10:32:29'}
		_dict['id'] = i
        mysql_queue.put(_dict)
		time.sleep(1)
Logo

更多推荐