源码分析elasticsearch python的scan及scroll实现

上篇文章已经讲过elasticsearch scan scroll的用途, 他们是用来解决获取大数据返回时的性能问题.  有兴趣的朋友可以回顾下 scan,scroll的作用一篇. http://xiaorui.cc/?p=3072

这次聊下elasticsearch python客户端的实现。

该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。

http://xiaorui.cc/?p=3080


这里再废话说下scan scroll的作用:

scroll用来避免深度分页查找数据时的性能消耗,他会像curosr那样做数据的快照。

scan能避免scroll的排序性能消耗,from size分页查询模式会对数据集进行整体排序, 性能损耗是很大的. 如果我们关闭排序,那么可以消耗极少资源返回所有的文档。scan就是不去,而是仅仅从每个有结果的分片中返回数据.

下面是python elasticsearch helpers.scan的源码。对照elasticsearch scroll scan基本用法,很容易就能理解下面的代码。  话说elasticsearch-py把高性能的功能都继承在了helpers模块里,比如helpers.scan helpers.reindex streaming_bulk helpers.bulk  parallel_bulk .  

elasticsearch.helpers.scan(client, query=None, scroll=u'5m', raise_on_error=True, preserve_order=False, **kwargs)

参数介绍: 
client – elasticsearch的连接对象
query – elasticsearch dsl查询语句
scroll – 你想让scroll的结果集在server端标记多久
raise_on_error – raise的error class
preserve_order – 这里其实对应的是search_type,是否要求排序

file: helpers/__init__.py

#xiaorui.cc

#默认是5m分钟, 默认是search_type是scan扫描模式
def scan(client, query=None, scroll='5m', preserve_order=False, **kwargs):

    if not preserve_order:   #是否需要scan模式
        kwargs['search_type'] = 'scan'
    resp = client.search(body=query, scroll=scroll, **kwargs)

    scroll_id = resp.get('_scroll_id')  #第一次查询拿到_scroll_id token
    if scroll_id is None:
        return

    first_run = True
    while True:
        #如果你server_type不是scan,那么第一次的结果里是包含数据的。
        if preserve_order and first_run:
            first_run = False
        else:
            resp = client.scroll(scroll_id, scroll=scroll)
        if not resp['hits']['hits']:
            break
        for hit in resp['hits']['hits']:
            yield hit    #通过yield生成器来返回数据
        scroll_id = resp.get('_scroll_id')
        if scroll_id is None:
            break

file: client/__init__.py

#xiaorui.cc
@query_params('scroll')
def scroll(self, scroll_id, params=None):
    # 第二次scroll的数据请求是直接 /_search/scroll,方法用的是GET
    _, data = self.transport.perform_request('GET', '/_search/scroll',params=params, body=scroll_id)
    return data

对于elasticsearch scanscroll的使用方法, 大家注意一下异常情况. 

#xiaorui.cc
data = scan(es,
    query={"query": {"match": {"domain": "xiaorui.cc"}}},
    index="xiaorui_index",
    doc_type="blog"
)
for one in data:
    print one

Elasticsearch博大精深… …  经过我的线下线上测试,使用scroll scan的性能还是不错的,返回的速度不错,明显比那种from size分页要快速,而且节省了elasticsearch的检索资源。 


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

1 Response

  1. 巭孬嫑夯昆 2016年9月7日 / 上午11:12

    django 中的api开发 使用helper.scan 获取个50M左右的数据 就报 timeout 是什么原因呢

巭孬嫑夯昆进行回复 取消回复

邮箱地址不会被公开。 必填项已用*标注