看了elasticsearch python的一些源码,现在看到bulk批量操作. 发现网上对于elasticsearch批量插入的文章有些少,我在这里就简单描述下es bulk的各个接口。
最近爬虫太粗暴了,文章总是被转走. 这里标注下原文链接 http://xiaorui.cc/?p=2245
希望这篇文章使大家能对elasticsearch有更好的理解. python elasticsearch批量操作函数是在helpers文件里面。
下面是helpers的用法,不太明白elasticsearch的命名手法,奇奇怪怪就用helpers….让人发愣 !
from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [] for j in range(0, 500): action = { "_index": "xiaorui_index", "_type": "fengyun", "_id": j, "_source": { "any":"data" + str(j), "timestamp": datetime.now() } } actions.append(action) if len(actions) > 0: helpers.bulk(es, actions)
说了详细的用法,我们再来看看elasticsearch py源码是怎么实现的. elasticsearch-py给我们提供了三个接口。
elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=103833600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, **kwargs)
streaming接口, 他把结果集组成成迭代器,用yield一个个的send出来。 def bulk调用的也是streaming_bulk函数.
client – instance of Elasticsearch to use
actions – iterable containing the actions to be executed
chunk_size – number of docs in one chunk sent to es (default: 500)
max_chunk_bytes – the maximum size of the request in bytes (default: 100MB)
elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=103833600, expand_action_callback=<function expand_action>, **kwargs)
这个函数是可以实现多线程批量操作,理论来说要比bulk和streaming_bulk要快的多… 相比其他两个bulk接口的参数列表,他有个多线程数目的控制…
thread_count – size of the threadpool to use for the bulk requests
原本以为他用的是threading函数,看了源代码才得知调用的是multiprocessing.dummy
elasticsearch.helpers.bulk(client, actions, stats_only=False, **kwargs)
client – instance of Elasticsearch to use
actions – iterator containing the actions
stats_only – if True only report number of successful/failed operations instead of just number of successful and a list of error responses
def bulk(client, actions, stats_only=False, **kwargs): success, failed = 0, 0 #当stats_only False的时候,不收集错误列表. errors = [] #可以看到bulk调用的用streaming_bulk,他会把成功数,失败数,失败列表return过去。 for ok, item in streaming_bulk(client, actions, **kwargs): if not ok: if not stats_only: errors.append(item) failed += 1 else: success += 1 return success, failed if stats_only else errors def parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=100 * 1014 * 1024, expand_action_callback=expand_action, **kwargs): #这是多线程的版本,我们可以看到下面引用了multiprocessing.dummy多线程库 from multiprocessing.dummy import Pool actions = map(expand_action_callback, actions) pool = Pool(thread_count) for result in pool.imap( lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)), _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) ): for item in result: yield item pool.close() pool.join() def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 * 1024, raise_on_error=True, expand_action_callback=expand_action, raise_on_exception=True, **kwargs): #buffer list里最大的空间占用大小 100M #buffer list个数 #raise_on_error是否报错,False是忽略 actions = map(expand_action_callback, actions) for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer): for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs): yield result def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer): """ Split actions into chunks by number or size, serialize them into strings in the process. """ bulk_actions = [] size, action_count = 0, 0 for action, data in actions: action = serializer.dumps(action) cur_size = len(action) + 1 if data is not None: data = serializer.dumps(data) cur_size += len(data) + 1 #如果chunk满了或者是chunk_bytes超过限制的空间大小,默认是100M if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size): yield bulk_actions bulk_actions = [] size, action_count = 0, 0 bulk_actions.append(action) if data is not None: bulk_actions.append(data) size += cur_size action_count += 1 if bulk_actions: yield bulk_actions
对于python elasticsearch批量插入性能,我觉得这根线上的整套环境是有关联,比如client的带宽,nginx的一些调参(尤其是proxy buffer),elasticsearch本身的负载等等有关。
我这边得到的结果是,每次批量操作的空间大小别超过20M,elasticsearch的timeout也控制到最少60秒。 虽然很多时候你测试花费的时间都很短,但现实往往不是这样.