多进程下用requests访问http api时遇到的坑

简单描述下场景,我这边开发了一个服务是用来把Mysql的数据取出来,并进行分词及正负面操作后再导出到elasticsearch里面。  其实是多线程开发的,原本以为这些操作基本是网络io的操作,用多线程是没有问题. 但是通过这几天的观察速度还真是有些慢,每小时才50w的数据 (另外产品狗催我..  mysql-es导出服务速度确实有些慢,后来又增加了线程,但对于导出速度没啥提升,最直观的是单进程已经超过100%了. )

通过对函数耗时日志统计, 从mysql取数据,导出到elasticsearch,以及正负面计算都还行,最消耗时间的是分词…  我这不单单是通过api来分词,本地也会调用结巴分词模块..,这加速了导出程序在单进程下cpu 100%.   个人觉得requests本身性能也一般.


文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新. 

http://xiaorui.cc/2015/12/22/%E5%A4%9A%E8%BF%9B%E7%A8%8B%E4%B8%8B%E7%94%A8requests%E8%AE%BF%E9%97%AEhttp-api%E6%97%B6%E9%81%87%E5%88%B0%E7%9A%84%E5%9D%91/

因为有好几十亿的数据需要从Myqsl导入到Elasticsearch里,现在每分钟3w的速度已经算是龟速了. 

没办法只能换成multiprocessing多进程了. 但当我把多线程切切换成多进程,再次并发调用ik分词api的时候,会遇到下面的报错. 

HTTPConnectionPool(host='xiaorui.cc', port=11011): Max retries exceeded with url: /ik/segment/ (Caused by <class 'socket.error'>: [Errno 110] Connection timed out)
get segment faild, timeout over ...

这问题是tcp连接太多,又没有及时关闭引起的. 一般这问题不能简简单单的配置sysctl.conf就解决的.  当前的requests版本是urllib3的封装,urllib3默认是不开启keepalive长连接的。我们只是单纯引用requests.Request()的话, 他的headers connection keepalive是空的,但是他会根据返回的response headers connection来判断是否进行长连接.

一开始我是在我的的Mac主机下进行api调用,在压力够大的情况也没有出现 HTTPConnectionPool(host=’xiaorui.cc’, port=11011): Max retries exceeded 的报错,但是在线上总是出现这问题. 原因如下.

Centos 下 , 我们可以看到centos下requests连接headers头部是没有keepalive选项:

#xiaorui.cc

In [2]: import requests

In [3]: s = requests.Session()

In [4]: s.headers
Out[4]: CaseInsensitiveDict({'Accept-Encoding': 'gzip, deflate, compress', 'Accept': '*/*', 'User-Agent': 'python-requests/2.2.1 CPython/2.7.10 Linux/2.6.32-504.8.1.el6.x86_64'})

Mac ipython下:

# xiaorui.cc
In [1]: import requests

In [2]: s = requests.Session()

n [3]: s.headers
Out[3]: {'Connection': 'keep-alive', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'User-Agent': 'python-requests/2.5.1 CPython/2.7.6 Darwin/14.0.0'}

使用requests的session客户端模式和保持长连接的状态.   下面是我最终调试后的代码. 

#blog: xiaorui.cc
client = requests.Session()
def get_segment(text):
    segment_list = []
    text_list = []
    text_list.append(text)
    try:
        headers = {'Content-Type': 'application/json','Connection':'keep-alive'}
        r = client.post(SIGMENT_ADDRESS, data=json.dumps(text_list), headers=headers)
        items = r.json()[0]
        for item in items:
            kv = {}
            kv["k"] = item
            kv["v"] = items[item]
            segment_list.append(kv)
    except Exception as e:
        print "get segment faild, timeout over"
        return []

    return segment_list

通过netstat检查下已经使用建立的连接, 一共10个,因为我开了10个进程.


通过ps aux 可以查看我开启的进程. 


但是需要特别说明的是,如果你是爬虫相关的业务?抓取的网站还各种各样,每个服务器的地址都不一样,那么你不适用于我上面的方法,而是需要把Connection给关闭. 当然还是看场景. 多方调试下. 

r = requests.post(url=url, data=body, headers={‘Connection’:’close’})

如果你的压力源加大,比如把进程加到100多会出现下面的问题…

ETIMEDOUT (Connection timed out) 

HTTPConnectionPool(host=’xiaorui.cc’, port=80): Max retries exceeded with url: /ik/segment/ (Caused by <class ‘socket.error’>: [Errno 110] Connection timed out)
get segment faild, timeout over
2015-12-22 18:20:15,828 – mylogger – INFO – q_task 2093  , q_res 316
2015-12-22 18:20:15,929 – mylogger – INFO – q_task 2091  , q_res 316

我的结论:

      时常搞一些有量级的项目对自己有提高的…   另外发现不少人认为多线程是没有办法并发的.  其实python虽然不能并行,但是可以并发的.  python的多线程在网络io操作是有优势,但是在cpu计算下是无用处的. 多线程不易开太多,太多的线程的上下文切换对于系统是个负担.  


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注