虽然Influxdb写入速度还是很给力,但是谁不想更给力呀. 在github看到了一个利用codap和gevent实现的针对influxdb异步的模块,测试了下,挺不错。
博客的原文地址是 , http://xiaorui.cc
最主要就是调用codap这个东西。 codap是一个数据结构+callback回调+gevent的模块,简单说,我想把一组数据推送给mongodb,那么用codap就很容易做成异步回调的应用。
def back(db, name): return db.find(name) results = KV() results['foo'] = bar # bar is a function results.put('cats', get_photos, id, limit=4) # get_photos is a function results.put('monkey', back, db, name) render_template('my_temp.html', **results)
对于codap感兴趣的朋友,可以看看 https://github.com/lateefj/codap
那么咱们再回来看看asyncInflux是怎么利用codap,实现influxdb的异步操作的。 设立一个queue队列,然后gevent一个协程来操作。
from codap import spawn, Queue class AsyncWriter: def __init__(self, client): """Wraps the client connection with an async call""" self.q = Queue() self.client = client def _run(self): """The actual code to write the points""" try: for data in self.q: self.client.write_points(data) except KeyboardInterrupt: raise def start(self): """Backgrounds the writing of data points""" self.background = spawn(self._run) def stop(self): """Stop the writting of data points""" self.q.put_nowait(StopIteration) self.background.join() def write_points(self, body): """ Basically wraps the influxdb call """ self.q.put_nowait(body)
上面asyncinflux模块里申明的spawn和Queue,是从codap.py里面引入的。
import sys try: # Prefer gevent as it would be fastest from gevent import spawn from gevent.queue import Queue from gevent import monkey monkey.patch_socket() # Required if we want IO to be concurrent except: try: # Eventlet we are also fans of so that would be great from eventlet import spawn from eventlet.queue import Queue import eventlet eventlet.monkey_patch(all=True) # To support concurrent IO except: # Thread worst case but this will not scale well at all if sys.version_info[0] == 3: # Python 3 from queue import Queue as ThreadQueue else: from Queue import Queue as ThreadQueue from threading import Thread def thread_spawn(*args, **kwargs): """ Wrapper that acts like the coroutine libraries. Nothing really to see here. """ t = None if len(args) == 1 and not kwargs: t = Thread(target=args[0], args=()) else: t = Thread(target=args[0], args=args[1:], kwargs=kwargs) t.start() # Fall back to using threads Queue = ThreadQueue spawn = thread_spawn
codap是个有趣的东西,大家可以自己斟酌研究下 ! 这东西本身也没啥难度,只是在某个环境下,有些灵巧罢了。