看了elasticsearch python的一些源码,现在看到bulk批量操作. 发现网上对于elasticsearch批量插入的文章有些少,我在这里就简单描述下es bulk的各个接口。
最近爬虫太粗暴了,文章总是被转走. 这里标注下原文链接 http://xiaorui.cc/?p=2245
希望这篇文章使大家能对elasticsearch有更好的理解. python elasticsearch批量操作函数是在helpers文件里面。
下面是helpers的用法,不太明白elasticsearch的命名手法,奇奇怪怪就用helpers….让人发愣 !
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
actions = []
for j in range(0, 500):
action = {
"_index": "xiaorui_index",
"_type": "fengyun",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()
}
}
actions.append(action)
if len(actions) > 0:
helpers.bulk(es, actions)
说了详细的用法,我们再来看看elasticsearch py源码是怎么实现的. elasticsearch-py给我们提供了三个接口。
elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=103833600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, **kwargs)
streaming接口, 他把结果集组成成迭代器,用yield一个个的send出来。 def bulk调用的也是streaming_bulk函数.
client – instance of Elasticsearch to use
actions – iterable containing the actions to be executed
chunk_size – number of docs in one chunk sent to es (default: 500)
max_chunk_bytes – the maximum size of the request in bytes (default: 100MB)
elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=103833600, expand_action_callback=<function expand_action>, **kwargs)
这个函数是可以实现多线程批量操作,理论来说要比bulk和streaming_bulk要快的多… 相比其他两个bulk接口的参数列表,他有个多线程数目的控制…
thread_count – size of the threadpool to use for the bulk requests
原本以为他用的是threading函数,看了源代码才得知调用的是multiprocessing.dummy
elasticsearch.helpers.bulk(client, actions, stats_only=False, **kwargs)
client – instance of Elasticsearch to use
actions – iterator containing the actions
stats_only – if True only report number of successful/failed operations instead of just number of successful and a list of error responses
def bulk(client, actions, stats_only=False, **kwargs):
success, failed = 0, 0
#当stats_only False的时候,不收集错误列表.
errors = []
#可以看到bulk调用的用streaming_bulk,他会把成功数,失败数,失败列表return过去。
for ok, item in streaming_bulk(client, actions, **kwargs):
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
return success, failed if stats_only else errors
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
max_chunk_bytes=100 * 1014 * 1024,
expand_action_callback=expand_action, **kwargs):
#这是多线程的版本,我们可以看到下面引用了multiprocessing.dummy多线程库
from multiprocessing.dummy import Pool
actions = map(expand_action_callback, actions)
pool = Pool(thread_count)
for result in pool.imap(
lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
):
for item in result:
yield item
pool.close()
pool.join()
def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 * 1024,
raise_on_error=True, expand_action_callback=expand_action,
raise_on_exception=True, **kwargs):
#buffer list里最大的空间占用大小 100M
#buffer list个数
#raise_on_error是否报错,False是忽略
actions = map(expand_action_callback, actions)
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
yield result
def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
"""
Split actions into chunks by number or size, serialize them into strings in
the process.
"""
bulk_actions = []
size, action_count = 0, 0
for action, data in actions:
action = serializer.dumps(action)
cur_size = len(action) + 1
if data is not None:
data = serializer.dumps(data)
cur_size += len(data) + 1
#如果chunk满了或者是chunk_bytes超过限制的空间大小,默认是100M
if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size):
yield bulk_actions
bulk_actions = []
size, action_count = 0, 0
bulk_actions.append(action)
if data is not None:
bulk_actions.append(data)
size += cur_size
action_count += 1
if bulk_actions:
yield bulk_actions
对于python elasticsearch批量插入性能,我觉得这根线上的整套环境是有关联,比如client的带宽,nginx的一些调参(尤其是proxy buffer),elasticsearch本身的负载等等有关。
我这边得到的结果是,每次批量操作的空间大小别超过20M,elasticsearch的timeout也控制到最少60秒。 虽然很多时候你测试花费的时间都很短,但现实往往不是这样.
