从源码分析python批量插入elasticsearch的实现

看了elasticsearch python的一些源码,现在看到bulk批量操作.  发现网上对于elasticsearch批量插入的文章有些少,我在这里就简单描述下es bulk的各个接口。 


最近爬虫太粗暴了,文章总是被转走. 这里标注下原文链接 http://xiaorui.cc/?p=2245

http://xiaorui.cc/2015/10/30/%E4%BB%8E%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90python%E6%89%B9%E9%87%8F%E6%8F%92%E5%85%A5elasticsearch%E7%9A%84%E5%AE%9E%E7%8E%B0/



希望这篇文章使大家能对elasticsearch有更好的理解. python elasticsearch批量操作函数是在helpers文件里面。 

下面是helpers的用法,不太明白elasticsearch的命名手法,奇奇怪怪就用helpers….让人发愣 !

from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

actions = []

for j in range(0, 500):
    action = {
        "_index": "xiaorui_index",
        "_type": "fengyun",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }

    actions.append(action)

if len(actions) > 0:
    helpers.bulk(es, actions)

说了详细的用法,我们再来看看elasticsearch py源码是怎么实现的.  elasticsearch-py给我们提供了三个接口。

elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=103833600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, **kwargs)

streaming接口, 他把结果集组成成迭代器,用yield一个个的send出来。 def bulk调用的也是streaming_bulk函数.

client – instance of Elasticsearch to use
actions – iterable containing the actions to be executed
chunk_size – number of docs in one chunk sent to es (default: 500)
max_chunk_bytes – the maximum size of the request in bytes (default: 100MB)

elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=103833600, expand_action_callback=<function expand_action>, **kwargs)
这个函数是可以实现多线程批量操作,理论来说要比bulk和streaming_bulk要快的多… 相比其他两个bulk接口的参数列表,他有个多线程数目的控制…

thread_count – size of the threadpool to use for the bulk requests

原本以为他用的是threading函数,看了源代码才得知调用的是multiprocessing.dummy

elasticsearch.helpers.bulk(client, actions, stats_only=False, **kwargs)
client – instance of Elasticsearch to use
actions – iterator containing the actions
stats_only – if True only report number of successful/failed operations instead of just number of successful and a list of error responses

def bulk(client, actions, stats_only=False, **kwargs):
    success, failed = 0, 0

    #当stats_only False的时候,不收集错误列表.
    errors = []
    #可以看到bulk调用的用streaming_bulk,他会把成功数,失败数,失败列表return过去。
    for ok, item in streaming_bulk(client, actions, **kwargs):

        if not ok:
            if not stats_only:
                errors.append(item)
            failed += 1
        else:
            success += 1

    return success, failed if stats_only else errors



def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
        max_chunk_bytes=100 * 1014 * 1024,
        expand_action_callback=expand_action, **kwargs):
    #这是多线程的版本,我们可以看到下面引用了multiprocessing.dummy多线程库

    from multiprocessing.dummy import Pool
    actions = map(expand_action_callback, actions)

    pool = Pool(thread_count)

    for result in pool.imap(
        lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
        _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
        ):
        for item in result:
            yield item

    pool.close()
    pool.join()



def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 * 1024,
        raise_on_error=True, expand_action_callback=expand_action,
        raise_on_exception=True, **kwargs):

    #buffer list里最大的空间占用大小 100M
    #buffer list个数
    #raise_on_error是否报错,False是忽略
    actions = map(expand_action_callback, actions)

    for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
        for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
            yield result



def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
    """
    Split actions into chunks by number or size, serialize them into strings in
    the process.
    """
    bulk_actions = []
    size, action_count = 0, 0
    for action, data in actions:
        action = serializer.dumps(action)
        cur_size = len(action) + 1

        if data is not None:
            data = serializer.dumps(data)
            cur_size += len(data) + 1

        #如果chunk满了或者是chunk_bytes超过限制的空间大小,默认是100M
        if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size):
            yield bulk_actions
            bulk_actions = []
            size, action_count = 0, 0

        bulk_actions.append(action)
        if data is not None:
            bulk_actions.append(data)
        size += cur_size
        action_count += 1
    if bulk_actions:
        yield bulk_actions

对于python elasticsearch批量插入性能,我觉得这根线上的整套环境是有关联,比如client的带宽,nginx的一些调参(尤其是proxy buffer),elasticsearch本身的负载等等有关。 

我这边得到的结果是,每次批量操作的空间大小别超过20M,elasticsearch的timeout也控制到最少60秒。 虽然很多时候你测试花费的时间都很短,但现实往往不是这样.





大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注