虽然Influxdb写入速度还是很给力,但是谁不想更给力呀. 在github看到了一个利用codap和gevent实现的针对influxdb异步的模块,测试了下,挺不错。
博客的原文地址是 , http://xiaorui.cc
最主要就是调用codap这个东西。 codap是一个数据结构+callback回调+gevent的模块,简单说,我想把一组数据推送给mongodb,那么用codap就很容易做成异步回调的应用。
def back(db, name):
return db.find(name)
results = KV()
results['foo'] = bar # bar is a function
results.put('cats', get_photos, id, limit=4) # get_photos is a function
results.put('monkey', back, db, name)
render_template('my_temp.html', **results)
对于codap感兴趣的朋友,可以看看 https://github.com/lateefj/codap
那么咱们再回来看看asyncInflux是怎么利用codap,实现influxdb的异步操作的。 设立一个queue队列,然后gevent一个协程来操作。
from codap import spawn, Queue
class AsyncWriter:
def __init__(self, client):
"""Wraps the client connection with an async call"""
self.q = Queue()
self.client = client
def _run(self):
"""The actual code to write the points"""
try:
for data in self.q:
self.client.write_points(data)
except KeyboardInterrupt:
raise
def start(self):
"""Backgrounds the writing of data points"""
self.background = spawn(self._run)
def stop(self):
"""Stop the writting of data points"""
self.q.put_nowait(StopIteration)
self.background.join()
def write_points(self, body):
""" Basically wraps the influxdb call """
self.q.put_nowait(body)
上面asyncinflux模块里申明的spawn和Queue,是从codap.py里面引入的。
import sys
try:
# Prefer gevent as it would be fastest
from gevent import spawn
from gevent.queue import Queue
from gevent import monkey
monkey.patch_socket() # Required if we want IO to be concurrent
except:
try:
# Eventlet we are also fans of so that would be great
from eventlet import spawn
from eventlet.queue import Queue
import eventlet
eventlet.monkey_patch(all=True) # To support concurrent IO
except:
# Thread worst case but this will not scale well at all
if sys.version_info[0] == 3: # Python 3
from queue import Queue as ThreadQueue
else:
from Queue import Queue as ThreadQueue
from threading import Thread
def thread_spawn(*args, **kwargs):
"""
Wrapper that acts like the coroutine libraries. Nothing really to
see here.
"""
t = None
if len(args) == 1 and not kwargs:
t = Thread(target=args[0], args=())
else:
t = Thread(target=args[0], args=args[1:], kwargs=kwargs)
t.start()
# Fall back to using threads
Queue = ThreadQueue
spawn = thread_spawn
codap是个有趣的东西,大家可以自己斟酌研究下 ! 这东西本身也没啥难度,只是在某个环境下,有些灵巧罢了。
