关于elasticsearch的操作,我一般是使用python elasticsearch库的。 维护他人的代码所以间接的再研究下pyes。 我们的所有业务都是依赖于ElasticSearch数据库的,不说java,python关于es的模块有elasticsearch和pyes这两个库。 而我跟伟哥用的都是python elasticsearch模块。一个部门居然用不同的模块…
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。http://xiaorui.cc/?p=3043
对于他们的区别我这里说两句, elasticsearch-py的文档要比pyes要完整,要完善,给出的实例也很丰富。 pyes的文档实在太伤,不说别的这明明是python的模块,冷不丁出来个java api的使用方法,闹呢… 因文档太粗糙,我花了点时间读了下pyes代码,可以说pyes代码复杂度要比elasticsearch-py低不少…
elasticsearch-py是官方elastic来维护,pyes是个人组织来维护。
elasticsearch-py的更新和修复很是及时,最近也是有更新。 上次的commit是去年的10月份. commit的最近频繁说明这项目是有活力. 我还记得去年给他们提价了一个bug,不到一个星期就给修复了. 总是一句话,推荐使用elasticsearch-py .
正题开始,说下pyes bulk的实现方法. 为什么要说pyes bulk呢 ? 首先遇到了pyes bulk的一个坑. 以前都是使用elasticsearch-py Helpers来批量bulk操作,实现是属于典型的批量方法,我自己设立一个list,然后把doc塞到这list里面,然后把这list扔给bulk方法就可以了。
pyes不是这样的, pyes内部里面有个bulk list,而且这个list是线程安全的。如果你是多进程尽量不要共用一个es连接对象,会出现类似线程安全的问题 .
class BaseBulker(object):
def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False):
self.conn = conn
self._bulk_size = bulk_size
self.bulk_lock = threading.RLock() #锁
with self.bulk_lock:
self.bulk_data = []
下面是pyes官方给出的bulk方法, 但这跑不通… …
#xiaorui.cc #单次插入 from pyes import ES es = ES() es.index(data, 'xiaorui.cc', 'my-type', 1) es.index(data, 'xiaorui.cc', 'my-type', 2) es.index(data, 'xiaorui.cc', 'my-type', 3) es.index(data, 'xiaorui.cc', 'my-type', 4) #批量插入 from pyes import ES es = ES() es.index(data, 'xiaorui.cc', 'my-type', 1, bulk=True) es.index(data, 'xiaorui.cc', 'my-type', 2, bulk=True) es.index(data, 'xiaorui.cc', 'my-type', 3, bulk=True) es.index(data, 'xiaorui.cc', 'my-type', 4, bulk=True) es.refresh()
当我按照pyes给出的方法,调用refresh结果出问题….
ERROR: pyes object <pyes.es.ES object at 0x1418a90> is being destroyed, but bulk operations have not been flushed. Call force_bulk()!
我们应该这么使用pyes模块:
#xiaorui.cc
from pyes import ES
conn = ES('127.0.0.1:9200', timeout=20.0, bulk_size=100)
es.index(data, 'xiaorui.cc', 'my-type', 4, bulk=True)
es.force_bulk()
pyes的bulk_size默认是400,es.index后面的bulk参数一定要是True. 这样你不是真正的插入数据,而是放在客户端的队列里,当你手动force_bulk()或者bulk_size满了才能真正的推送doc文档到es.
下面是pyes批量的实现源码,这源码很多细节表明作者还是很有功底的.
#xiaorui.cc
#代码我删除了不少,留最主要的部分
#建立连接es的对象
#~/github/pyes/pyes/es.py
class ES(object):
def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,
encoder=None, decoder=None,
max_retries=3,
retry_time=60,
default_indices=None,
raise_on_bulk_item_failure=False,
bulker_class=ListBulker,
#这里就是我们es.index的地方.
def index(self, doc, index, doc_type, id=None, parent=None, force_insert=False,
op_type=None, bulk=False, version=None, querystring_args=None, ttl=None):
if querystring_args is None:
querystring_args = {}
if bulk:
...
if isinstance(doc, dict):
doc = json.dumps(doc, cls=self.encoder)
command = "%s\n%s" % (json.dumps(cmd, cls=self.encoder), doc)
self.bulker.add(command)
return self.flush_bulk() #当你bulk为True的时候,会把这条doc add到bulker队列里。
def flush_bulk(self, forced=False):
return self.bulker.flush_bulk(forced) # ListBulker的flush_bulk方法,判断是否队列大小是否超过了bulk_size,如果超过了...
def force_bulk(self):
return self.flush_bulk(True) #这个是真正的执行,并返回结果
# github/pyes/pyes/models.py
class ListBulker(BaseBulker):
"""
A bulker that store data in a list
"""
def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False):
super(ListBulker, self).__init__(conn=conn, bulk_size=bulk_size,
raise_on_bulk_item_failure=raise_on_bulk_item_failure)
with self.bulk_lock:
self.bulk_data = [] #内置bulk的缓冲队列.
def add(self, content):
with self.bulk_lock:
self.bulk_data.append(content) #把doc放到list里面。
def flush_bulk(self, forced=False):
with self.bulk_lock:
if forced or len(self.bulk_data) >= self.bulk_size: #大于bulk_size的时候
batch = self.bulk_data
self.bulk_data = []
else:
return None
if len(batch) > 0: #调用http post方法进行批量请求. 注明了流式接口
bulk_result = self.conn._send_request("POST",
"/_bulk",
"\n".join(batch) + "\n")
if self.raise_on_bulk_item_failure:
_raise_exception_if_bulk_item_failed(bulk_result)
return bulk_result
下面批量插入后的性能测试结果,大家会觉得慢么? 我们的ElasticSearch的节点有70+的规模了,其实我也觉得慢… 通过kopf看到elasticsearch的状况不错…. 慢的原因是我们每个doc文档都很大,出口或调度总归有影响的. 跟Hbase Thrift Server一样,没改成批量之前,负责hadoop的同事就说,插入查询慢的原因是由于没有使用批量接口,后来我用了批量接口…. 消耗时间是线性增长的,没效果…
2016-03-24 18:47:10,220 - 47251 - ERROR: cost time :9.40016007423 counter : 100 2016-03-24 18:47:20,978 - 47251 - ERROR: cost time :10.7454559803 counter : 100 2016-03-24 18:47:31,030 - 47251 - ERROR: cost time :10.0432369709 counter : 100 2016-03-24 18:47:40,933 - 47251 - ERROR: cost time :9.89056110382 counter : 100 2016-03-24 18:47:51,404 - 47251 - ERROR: cost time :9.78279089928 counter : 100 2016-03-24 18:48:01,813 - 47251 - ERROR: cost time :10.3971750736 counter : 100 2016-03-24 18:48:12,523 - 47251 - ERROR: cost time :10.6971299648 counter : 100 2016-03-24 18:48:22,161 - 47251 - ERROR: cost time :9.62901997566 counter : 100 2016-03-24 18:48:32,640 - 47251 - ERROR: cost time :10.4641959667 counter : 100 2016-03-24 18:48:42,745 - 47251 - ERROR: cost time :10.0933129787 counter : 100 2016-03-24 18:48:52,502 - 47251 - ERROR: cost time :9.74789094925 counter : 100 2016-03-24 18:49:02,034 - 47251 - ERROR: cost time :9.52420806885 counter : 99 2016-03-24 18:49:13,228 - 47251 - ERROR: cost time :11.1852941513 counter : 100 2016-03-24 18:49:23,503 - 47251 - ERROR: cost time :10.2656178474 counter : 99 2016-03-24 18:49:33,313 - 47251 - ERROR: cost time :9.80110192299 counter : 100 2016-03-24 18:49:41,183 - 47251 - ERROR: cost time :7.8603579998 counter : 100 2016-03-24 18:49:57,170 - 47251 - ERROR: cost time :10.6526181698 counter : 100 2016-03-24 18:50:07,978 - 47251 - ERROR: cost time :10.7967550755 counter : 100 2016-03-24 18:50:18,814 - 47251 - ERROR: cost time :10.8282749653 counter : 100 2016-03-24 18:50:28,316 - 47251 - ERROR: cost time :9.49260401726 counter : 100 2016-03-24 18:50:39,219 - 47251 - ERROR: cost time :10.8951020241 counter : 100 2016-03-24 18:50:50,327 - 47251 - ERROR: cost time :11.0981459618 counter : 99 2016-03-24 18:51:00,368 - 47251 - ERROR: cost time :10.031306982 counter : 100 2016-03-24 18:51:10,659 - 47251 - ERROR: cost time :10.2820689678 counter : 100 2016-03-24 18:51:21,614 - 47251 - ERROR: cost time :10.945966959 counter : 100 2016-03-24 18:51:33,614 - 47251 - ERROR: cost time :11.9910209179 counter : 100 2016-03-24 18:51:44,613 - 47251 - ERROR: cost time :10.9841971397 counter : 100
ElasticSearch的bulk批量没啥好讲的,这次碰巧碰到这问题… 我还是推荐大家使用ElasticSearch-py的库….
END.
