关于python使用AsyncInflux异步操作influxdb的方案

虽然Influxdb写入速度还是很给力,但是谁不想更给力呀.   在github看到了一个利用codap和gevent实现的针对influxdb异步的模块,测试了下,挺不错。 

博客的原文地址是  ,   http://xiaorui.cc

最主要就是调用codap这个东西。 codap是一个数据结构+callback回调+gevent的模块,简单说,我想把一组数据推送给mongodb,那么用codap就很容易做成异步回调的应用。 

def back(db, name):
  return db.find(name)

results = KV()
results['foo'] = bar # bar is a function
results.put('cats', get_photos, id, limit=4) # get_photos is a function
results.put('monkey', back, db, name)
render_template('my_temp.html', **results)

对于codap感兴趣的朋友,可以看看  https://github.com/lateefj/codap  

那么咱们再回来看看asyncInflux是怎么利用codap,实现influxdb的异步操作的。 设立一个queue队列,然后gevent一个协程来操作。 

from codap import spawn, Queue

class AsyncWriter:

    def __init__(self, client):
        """Wraps the client connection with an async call"""
        self.q = Queue()
        self.client = client

    def _run(self):
        """The actual code to write the points"""
        try:
            for data in self.q:
                self.client.write_points(data)
        except KeyboardInterrupt:
            raise

    def start(self):
        """Backgrounds the writing of data points"""
        self.background = spawn(self._run)

    def stop(self):
        """Stop the writting of data points"""
        self.q.put_nowait(StopIteration)
        self.background.join()

    def write_points(self, body):
        """ Basically wraps the influxdb call """
        self.q.put_nowait(body)

上面asyncinflux模块里申明的spawn和Queue,是从codap.py里面引入的。

import sys


try:
    # Prefer gevent as it would be fastest
    from gevent import spawn
    from gevent.queue import Queue
    from gevent import monkey
    monkey.patch_socket()  # Required if we want IO to be concurrent
except:
    try:
        # Eventlet we are also fans of so that would be great
        from eventlet import spawn
        from eventlet.queue import Queue
        import eventlet
        eventlet.monkey_patch(all=True)  # To support concurrent IO

    except:
        # Thread worst case but this will not scale well at all
        if sys.version_info[0] == 3:  # Python 3
            from queue import Queue as ThreadQueue
        else:
            from Queue import Queue as ThreadQueue
        from threading import Thread

        def thread_spawn(*args, **kwargs):
            """
            Wrapper that acts like the coroutine libraries. Nothing really to
            see here.
            """
            t = None
            if len(args) == 1 and not kwargs:
                t = Thread(target=args[0], args=())
            else:
                t = Thread(target=args[0], args=args[1:], kwargs=kwargs)
                t.start()
        # Fall back to using threads
        Queue = ThreadQueue
        spawn = thread_spawn

codap是个有趣的东西,大家可以自己斟酌研究下 ! 这东西本身也没啥难度,只是在某个环境下,有些灵巧罢了。  


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注