关于elasticsearch的操作,我一般是使用python elasticsearch库的。 维护他人的代码所以间接的再研究下pyes。 我们的所有业务都是依赖于ElasticSearch数据库的,不说java,python关于es的模块有elasticsearch和pyes这两个库。 而我跟伟哥用的都是python elasticsearch模块。一个部门居然用不同的模块…
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。http://xiaorui.cc/?p=3043
对于他们的区别我这里说两句, elasticsearch-py的文档要比pyes要完整,要完善,给出的实例也很丰富。 pyes的文档实在太伤,不说别的这明明是python的模块,冷不丁出来个java api的使用方法,闹呢… 因文档太粗糙,我花了点时间读了下pyes代码,可以说pyes代码复杂度要比elasticsearch-py低不少…
elasticsearch-py是官方elastic来维护,pyes是个人组织来维护。
elasticsearch-py的更新和修复很是及时,最近也是有更新。 上次的commit是去年的10月份. commit的最近频繁说明这项目是有活力. 我还记得去年给他们提价了一个bug,不到一个星期就给修复了. 总是一句话,推荐使用elasticsearch-py .
正题开始,说下pyes bulk的实现方法. 为什么要说pyes bulk呢 ? 首先遇到了pyes bulk的一个坑. 以前都是使用elasticsearch-py Helpers来批量bulk操作,实现是属于典型的批量方法,我自己设立一个list,然后把doc塞到这list里面,然后把这list扔给bulk方法就可以了。
pyes不是这样的, pyes内部里面有个bulk list,而且这个list是线程安全的。如果你是多进程尽量不要共用一个es连接对象,会出现类似线程安全的问题 .
class BaseBulker(object): def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False): self.conn = conn self._bulk_size = bulk_size self.bulk_lock = threading.RLock() #锁 with self.bulk_lock: self.bulk_data = []
下面是pyes官方给出的bulk方法, 但这跑不通… …
#xiaorui.cc #单次插入 from pyes import ES es = ES() es.index(data, 'xiaorui.cc', 'my-type', 1) es.index(data, 'xiaorui.cc', 'my-type', 2) es.index(data, 'xiaorui.cc', 'my-type', 3) es.index(data, 'xiaorui.cc', 'my-type', 4) #批量插入 from pyes import ES es = ES() es.index(data, 'xiaorui.cc', 'my-type', 1, bulk=True) es.index(data, 'xiaorui.cc', 'my-type', 2, bulk=True) es.index(data, 'xiaorui.cc', 'my-type', 3, bulk=True) es.index(data, 'xiaorui.cc', 'my-type', 4, bulk=True) es.refresh()
当我按照pyes给出的方法,调用refresh结果出问题….
ERROR: pyes object <pyes.es.ES object at 0x1418a90> is being destroyed, but bulk operations have not been flushed. Call force_bulk()!
我们应该这么使用pyes模块:
#xiaorui.cc from pyes import ES conn = ES('127.0.0.1:9200', timeout=20.0, bulk_size=100) es.index(data, 'xiaorui.cc', 'my-type', 4, bulk=True) es.force_bulk()
pyes的bulk_size默认是400,es.index后面的bulk参数一定要是True. 这样你不是真正的插入数据,而是放在客户端的队列里,当你手动force_bulk()或者bulk_size满了才能真正的推送doc文档到es.
下面是pyes批量的实现源码,这源码很多细节表明作者还是很有功底的.
#xiaorui.cc #代码我删除了不少,留最主要的部分 #建立连接es的对象 #~/github/pyes/pyes/es.py class ES(object): def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400, encoder=None, decoder=None, max_retries=3, retry_time=60, default_indices=None, raise_on_bulk_item_failure=False, bulker_class=ListBulker, #这里就是我们es.index的地方. def index(self, doc, index, doc_type, id=None, parent=None, force_insert=False, op_type=None, bulk=False, version=None, querystring_args=None, ttl=None): if querystring_args is None: querystring_args = {} if bulk: ... if isinstance(doc, dict): doc = json.dumps(doc, cls=self.encoder) command = "%s\n%s" % (json.dumps(cmd, cls=self.encoder), doc) self.bulker.add(command) return self.flush_bulk() #当你bulk为True的时候,会把这条doc add到bulker队列里。 def flush_bulk(self, forced=False): return self.bulker.flush_bulk(forced) # ListBulker的flush_bulk方法,判断是否队列大小是否超过了bulk_size,如果超过了... def force_bulk(self): return self.flush_bulk(True) #这个是真正的执行,并返回结果 # github/pyes/pyes/models.py class ListBulker(BaseBulker): """ A bulker that store data in a list """ def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False): super(ListBulker, self).__init__(conn=conn, bulk_size=bulk_size, raise_on_bulk_item_failure=raise_on_bulk_item_failure) with self.bulk_lock: self.bulk_data = [] #内置bulk的缓冲队列. def add(self, content): with self.bulk_lock: self.bulk_data.append(content) #把doc放到list里面。 def flush_bulk(self, forced=False): with self.bulk_lock: if forced or len(self.bulk_data) >= self.bulk_size: #大于bulk_size的时候 batch = self.bulk_data self.bulk_data = [] else: return None if len(batch) > 0: #调用http post方法进行批量请求. 注明了流式接口 bulk_result = self.conn._send_request("POST", "/_bulk", "\n".join(batch) + "\n") if self.raise_on_bulk_item_failure: _raise_exception_if_bulk_item_failed(bulk_result) return bulk_result
下面批量插入后的性能测试结果,大家会觉得慢么? 我们的ElasticSearch的节点有70+的规模了,其实我也觉得慢… 通过kopf看到elasticsearch的状况不错…. 慢的原因是我们每个doc文档都很大,出口或调度总归有影响的. 跟Hbase Thrift Server一样,没改成批量之前,负责hadoop的同事就说,插入查询慢的原因是由于没有使用批量接口,后来我用了批量接口…. 消耗时间是线性增长的,没效果…
2016-03-24 18:47:10,220 - 47251 - ERROR: cost time :9.40016007423 counter : 100 2016-03-24 18:47:20,978 - 47251 - ERROR: cost time :10.7454559803 counter : 100 2016-03-24 18:47:31,030 - 47251 - ERROR: cost time :10.0432369709 counter : 100 2016-03-24 18:47:40,933 - 47251 - ERROR: cost time :9.89056110382 counter : 100 2016-03-24 18:47:51,404 - 47251 - ERROR: cost time :9.78279089928 counter : 100 2016-03-24 18:48:01,813 - 47251 - ERROR: cost time :10.3971750736 counter : 100 2016-03-24 18:48:12,523 - 47251 - ERROR: cost time :10.6971299648 counter : 100 2016-03-24 18:48:22,161 - 47251 - ERROR: cost time :9.62901997566 counter : 100 2016-03-24 18:48:32,640 - 47251 - ERROR: cost time :10.4641959667 counter : 100 2016-03-24 18:48:42,745 - 47251 - ERROR: cost time :10.0933129787 counter : 100 2016-03-24 18:48:52,502 - 47251 - ERROR: cost time :9.74789094925 counter : 100 2016-03-24 18:49:02,034 - 47251 - ERROR: cost time :9.52420806885 counter : 99 2016-03-24 18:49:13,228 - 47251 - ERROR: cost time :11.1852941513 counter : 100 2016-03-24 18:49:23,503 - 47251 - ERROR: cost time :10.2656178474 counter : 99 2016-03-24 18:49:33,313 - 47251 - ERROR: cost time :9.80110192299 counter : 100 2016-03-24 18:49:41,183 - 47251 - ERROR: cost time :7.8603579998 counter : 100 2016-03-24 18:49:57,170 - 47251 - ERROR: cost time :10.6526181698 counter : 100 2016-03-24 18:50:07,978 - 47251 - ERROR: cost time :10.7967550755 counter : 100 2016-03-24 18:50:18,814 - 47251 - ERROR: cost time :10.8282749653 counter : 100 2016-03-24 18:50:28,316 - 47251 - ERROR: cost time :9.49260401726 counter : 100 2016-03-24 18:50:39,219 - 47251 - ERROR: cost time :10.8951020241 counter : 100 2016-03-24 18:50:50,327 - 47251 - ERROR: cost time :11.0981459618 counter : 99 2016-03-24 18:51:00,368 - 47251 - ERROR: cost time :10.031306982 counter : 100 2016-03-24 18:51:10,659 - 47251 - ERROR: cost time :10.2820689678 counter : 100 2016-03-24 18:51:21,614 - 47251 - ERROR: cost time :10.945966959 counter : 100 2016-03-24 18:51:33,614 - 47251 - ERROR: cost time :11.9910209179 counter : 100 2016-03-24 18:51:44,613 - 47251 - ERROR: cost time :10.9841971397 counter : 100
ElasticSearch的bulk批量没啥好讲的,这次碰巧碰到这问题… 我还是推荐大家使用ElasticSearch-py的库….
END.