简单描述下场景,我这边开发了一个服务是用来把Mysql的数据取出来,并进行分词及正负面操作后再导出到elasticsearch里面。 其实是多线程开发的,原本以为这些操作基本是网络io的操作,用多线程是没有问题. 但是通过这几天的观察速度还真是有些慢,每小时才50w的数据 (另外产品狗催我.. mysql-es导出服务速度确实有些慢,后来又增加了线程,但对于导出速度没啥提升,最直观的是单进程已经超过100%了. )
通过对函数耗时日志统计, 从mysql取数据,导出到elasticsearch,以及正负面计算都还行,最消耗时间的是分词… 我这不单单是通过api来分词,本地也会调用结巴分词模块..,这加速了导出程序在单进程下cpu 100%. 个人觉得requests本身性能也一般.
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新.
因为有好几十亿的数据需要从Myqsl导入到Elasticsearch里,现在每分钟3w的速度已经算是龟速了.
没办法只能换成multiprocessing多进程了. 但当我把多线程切切换成多进程,再次并发调用ik分词api的时候,会遇到下面的报错.
HTTPConnectionPool(host='xiaorui.cc', port=11011): Max retries exceeded with url: /ik/segment/ (Caused by <class 'socket.error'>: [Errno 110] Connection timed out) get segment faild, timeout over ...
这问题是tcp连接太多,又没有及时关闭引起的. 一般这问题不能简简单单的配置sysctl.conf就解决的. 当前的requests版本是urllib3的封装,urllib3默认是不开启keepalive长连接的。我们只是单纯引用requests.Request()的话, 他的headers connection keepalive是空的,但是他会根据返回的response headers connection来判断是否进行长连接.
一开始我是在我的的Mac主机下进行api调用,在压力够大的情况也没有出现 HTTPConnectionPool(host=’xiaorui.cc’, port=11011): Max retries exceeded 的报错,但是在线上总是出现这问题. 原因如下.
Centos 下 , 我们可以看到centos下requests连接headers头部是没有keepalive选项:
#xiaorui.cc In [2]: import requests In [3]: s = requests.Session() In [4]: s.headers Out[4]: CaseInsensitiveDict({'Accept-Encoding': 'gzip, deflate, compress', 'Accept': '*/*', 'User-Agent': 'python-requests/2.2.1 CPython/2.7.10 Linux/2.6.32-504.8.1.el6.x86_64'})
Mac ipython下:
# xiaorui.cc In [1]: import requests In [2]: s = requests.Session() n [3]: s.headers Out[3]: {'Connection': 'keep-alive', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'User-Agent': 'python-requests/2.5.1 CPython/2.7.6 Darwin/14.0.0'}
使用requests的session客户端模式和保持长连接的状态. 下面是我最终调试后的代码.
#blog: xiaorui.cc client = requests.Session() def get_segment(text): segment_list = [] text_list = [] text_list.append(text) try: headers = {'Content-Type': 'application/json','Connection':'keep-alive'} r = client.post(SIGMENT_ADDRESS, data=json.dumps(text_list), headers=headers) items = r.json()[0] for item in items: kv = {} kv["k"] = item kv["v"] = items[item] segment_list.append(kv) except Exception as e: print "get segment faild, timeout over" return [] return segment_list
通过netstat检查下已经使用建立的连接, 一共10个,因为我开了10个进程.
通过ps aux 可以查看我开启的进程.
但是需要特别说明的是,如果你是爬虫相关的业务?抓取的网站还各种各样,每个服务器的地址都不一样,那么你不适用于我上面的方法,而是需要把Connection给关闭. 当然还是看场景. 多方调试下.
r = requests.post(url=url, data=body, headers={‘Connection’:’close’})
如果你的压力源加大,比如把进程加到100多会出现下面的问题…
ETIMEDOUT (Connection timed out)
HTTPConnectionPool(host=’xiaorui.cc’, port=80): Max retries exceeded with url: /ik/segment/ (Caused by <class ‘socket.error’>: [Errno 110] Connection timed out)
get segment faild, timeout over
2015-12-22 18:20:15,828 – mylogger – INFO – q_task 2093 , q_res 316
2015-12-22 18:20:15,929 – mylogger – INFO – q_task 2091 , q_res 316
我的结论:
时常搞一些有量级的项目对自己有提高的… 另外发现不少人认为多线程是没有办法并发的. 其实python虽然不能并行,但是可以并发的. python的多线程在网络io操作是有优势,但是在cpu计算下是无用处的. 多线程不易开太多,太多的线程的上下文切换对于系统是个负担.