源码分析elasticsearch的pyes bulk批量实现


关于elasticsearch的操作,我一般是使用python elasticsearch库的。 维护他人的代码所以间接的再研究下pyes。 我们的所有业务都是依赖于ElasticSearch数据库的,不说java,python关于es的模块有elasticsearch和pyes这两个库。  而我跟伟哥用的都是python elasticsearch模块。一个部门居然用不同的模块… 


该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。http://xiaorui.cc/?p=3043

对于他们的区别我这里说两句, elasticsearch-py的文档要比pyes要完整,要完善,给出的实例也很丰富。 pyes的文档实在太伤,不说别的这明明是python的模块,冷不丁出来个java api的使用方法,闹呢…  因文档太粗糙,我花了点时间读了下pyes代码,可以说pyes代码复杂度要比elasticsearch-py低不少…
elasticsearch-py是官方elastic来维护,pyes是个人组织来维护。
elasticsearch-py的更新和修复很是及时,最近也是有更新。 上次的commit是去年的10月份.  commit的最近频繁说明这项目是有活力. 我还记得去年给他们提价了一个bug,不到一个星期就给修复了.   总是一句话,推荐使用elasticsearch-py . 

正题开始,说下pyes bulk的实现方法. 为什么要说pyes bulk呢 ? 首先遇到了pyes bulk的一个坑.  以前都是使用elasticsearch-py Helpers来批量bulk操作,实现是属于典型的批量方法,我自己设立一个list,然后把doc塞到这list里面,然后把这list扔给bulk方法就可以了。

pyes不是这样的, pyes内部里面有个bulk list,而且这个list是线程安全的。如果你是多进程尽量不要共用一个es连接对象,会出现类似线程安全的问题 .


class BaseBulker(object):
    def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False):
        self.conn = conn
        self._bulk_size = bulk_size
        self.bulk_lock = threading.RLock()   #锁
        with self.bulk_lock:
            self.bulk_data = []

下面是pyes官方给出的bulk方法, 但这跑不通… … 

#xiaorui.cc
#单次插入
from pyes import ES

es = ES()
es.index(data, 'xiaorui.cc', 'my-type', 1)
es.index(data, 'xiaorui.cc', 'my-type', 2)
es.index(data, 'xiaorui.cc', 'my-type', 3)
es.index(data, 'xiaorui.cc', 'my-type', 4)

#批量插入
from pyes import ES

es = ES()
es.index(data, 'xiaorui.cc', 'my-type', 1, bulk=True)
es.index(data, 'xiaorui.cc', 'my-type', 2, bulk=True)
es.index(data, 'xiaorui.cc', 'my-type', 3, bulk=True)
es.index(data, 'xiaorui.cc', 'my-type', 4, bulk=True)
es.refresh()

当我按照pyes给出的方法,调用refresh结果出问题…. 

ERROR:    pyes object <pyes.es.ES object at 0x1418a90> is being destroyed, but bulk operations have not been flushed. Call force_bulk()!

我们应该这么使用pyes模块:

#xiaorui.cc
from pyes import ES

conn = ES('127.0.0.1:9200', timeout=20.0, bulk_size=100)
es.index(data, 'xiaorui.cc', 'my-type', 4, bulk=True)
es.force_bulk()

pyes的bulk_size默认是400,es.index后面的bulk参数一定要是True. 这样你不是真正的插入数据,而是放在客户端的队列里,当你手动force_bulk()或者bulk_size满了才能真正的推送doc文档到es.

下面是pyes批量的实现源码,这源码很多细节表明作者还是很有功底的.

#xiaorui.cc
#代码我删除了不少,留最主要的部分

#建立连接es的对象
#~/github/pyes/pyes/es.py
class ES(object):
    def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,
                 encoder=None, decoder=None,
                 max_retries=3,
                 retry_time=60,
                 default_indices=None,
                 raise_on_bulk_item_failure=False,
                 bulker_class=ListBulker,

#这里就是我们es.index的地方.
def index(self, doc, index, doc_type, id=None, parent=None, force_insert=False,
          op_type=None, bulk=False, version=None, querystring_args=None, ttl=None):
    if querystring_args is None:
        querystring_args = {}

    if bulk:
        ...
        if isinstance(doc, dict):
        doc = json.dumps(doc, cls=self.encoder)
        command = "%s\n%s" % (json.dumps(cmd, cls=self.encoder), doc)
        self.bulker.add(command)
        return self.flush_bulk()   #当你bulk为True的时候,会把这条doc add到bulker队列里。 


 def flush_bulk(self, forced=False):
     return self.bulker.flush_bulk(forced)   # ListBulker的flush_bulk方法,判断是否队列大小是否超过了bulk_size,如果超过了...

 def force_bulk(self):
     return self.flush_bulk(True)   #这个是真正的执行,并返回结果



# github/pyes/pyes/models.py

class ListBulker(BaseBulker):
    """
    A bulker that store data in a list
    """

    def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False):
        super(ListBulker, self).__init__(conn=conn, bulk_size=bulk_size,
                                         raise_on_bulk_item_failure=raise_on_bulk_item_failure)
        with self.bulk_lock:
            self.bulk_data = []     #内置bulk的缓冲队列.

    def add(self, content):
        with self.bulk_lock:
            self.bulk_data.append(content)     #把doc放到list里面。

    def flush_bulk(self, forced=False):
        with self.bulk_lock:
            if forced or len(self.bulk_data) >= self.bulk_size:   #大于bulk_size的时候
                batch = self.bulk_data
                self.bulk_data = []
            else:
                return None

        if len(batch) > 0:    #调用http post方法进行批量请求. 注明了流式接口
            bulk_result = self.conn._send_request("POST",
                                                  "/_bulk",
                                                  "\n".join(batch) + "\n")

            if self.raise_on_bulk_item_failure:
                _raise_exception_if_bulk_item_failed(bulk_result)
            return bulk_result


下面批量插入后的性能测试结果,大家会觉得慢么? 我们的ElasticSearch的节点有70+的规模了,其实我也觉得慢…  通过kopf看到elasticsearch的状况不错…. 慢的原因是我们每个doc文档都很大,出口或调度总归有影响的. 跟Hbase Thrift Server一样,没改成批量之前,负责hadoop的同事就说,插入查询慢的原因是由于没有使用批量接口,后来我用了批量接口…. 消耗时间是线性增长的,没效果… 

2016-03-24 18:47:10,220 - 47251 - ERROR:    cost time :9.40016007423  counter : 100
2016-03-24 18:47:20,978 - 47251 - ERROR:    cost time :10.7454559803  counter : 100
2016-03-24 18:47:31,030 - 47251 - ERROR:    cost time :10.0432369709  counter : 100
2016-03-24 18:47:40,933 - 47251 - ERROR:    cost time :9.89056110382  counter : 100
2016-03-24 18:47:51,404 - 47251 - ERROR:    cost time :9.78279089928  counter : 100
2016-03-24 18:48:01,813 - 47251 - ERROR:    cost time :10.3971750736  counter : 100
2016-03-24 18:48:12,523 - 47251 - ERROR:    cost time :10.6971299648  counter : 100
2016-03-24 18:48:22,161 - 47251 - ERROR:    cost time :9.62901997566  counter : 100
2016-03-24 18:48:32,640 - 47251 - ERROR:    cost time :10.4641959667  counter : 100
2016-03-24 18:48:42,745 - 47251 - ERROR:    cost time :10.0933129787  counter : 100
2016-03-24 18:48:52,502 - 47251 - ERROR:    cost time :9.74789094925  counter : 100
2016-03-24 18:49:02,034 - 47251 - ERROR:    cost time :9.52420806885  counter : 99
2016-03-24 18:49:13,228 - 47251 - ERROR:    cost time :11.1852941513  counter : 100
2016-03-24 18:49:23,503 - 47251 - ERROR:    cost time :10.2656178474  counter : 99
2016-03-24 18:49:33,313 - 47251 - ERROR:    cost time :9.80110192299  counter : 100
2016-03-24 18:49:41,183 - 47251 - ERROR:    cost time :7.8603579998  counter : 100
2016-03-24 18:49:57,170 - 47251 - ERROR:    cost time :10.6526181698  counter : 100
2016-03-24 18:50:07,978 - 47251 - ERROR:    cost time :10.7967550755  counter : 100
2016-03-24 18:50:18,814 - 47251 - ERROR:    cost time :10.8282749653  counter : 100
2016-03-24 18:50:28,316 - 47251 - ERROR:    cost time :9.49260401726  counter : 100
2016-03-24 18:50:39,219 - 47251 - ERROR:    cost time :10.8951020241  counter : 100
2016-03-24 18:50:50,327 - 47251 - ERROR:    cost time :11.0981459618  counter : 99
2016-03-24 18:51:00,368 - 47251 - ERROR:    cost time :10.031306982  counter : 100
2016-03-24 18:51:10,659 - 47251 - ERROR:    cost time :10.2820689678  counter : 100
2016-03-24 18:51:21,614 - 47251 - ERROR:    cost time :10.945966959  counter : 100
2016-03-24 18:51:33,614 - 47251 - ERROR:    cost time :11.9910209179  counter : 100
2016-03-24 18:51:44,613 - 47251 - ERROR:    cost time :10.9841971397  counter : 100

ElasticSearch的bulk批量没啥好讲的,这次碰巧碰到这问题…  我还是推荐大家使用ElasticSearch-py的库…. 


END.




大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注