这两天出了两个事故,一个是因为正负面的接口被被人疯狂访问而变得堵塞,导致整个动态网页解析解析的崩溃,最后redis挂掉。 还有一个事情是动态ip轮询模块的bug。。。 多事之秋呀。 这几个晚上一直尝试看gevent的源代码,收获特别的大,gevent本身的一些实现就特别的灵巧,背靠这内核的epoll调度,实现一系列的io调度非堵塞。
大家知道gevent是协程,协程是用户态层面自己解决io堵塞的问题,他会把每次堵塞时间都注册是epoll里面。 那么,gevent是基于单线程的实现,那么gevent本身是没有多进程的,但我们可以每个进程一组线程的方式来实现IO Bound的多进程模式。今天看了下gevent threadpool的相关代码,他的实现有点像multiprocessing threadpool 。 首先我们知道python的线程是个比较坑爹的东西,我现在的服务端架构基本是多进程加多协程的架构。 言归正传,gevent是如何实现多进程? 看了下官方的issue,有人提出用threadpool实现。 简单过了下代码,他是利用os.fork 来派生多进程的,具体的调度我就多说了,主要是在from gevent.hub import get_hub这里。
gevent的文章,http://xiaorui.cc/?p=1530
下面是我测试的表现,效果还是可以的,计算任务跑的还算平均。 测试是在一个云主机测试的,有些片面性。

测试的代码如下:
import time
import gevent
from gevent.threadpool import ThreadPool
def sum(i):
i = 0
for i in xrange(100000000):
i += i+1 * 20 / 10 * 10 /10
def test(n,m):
m=m
vals = []
keys = []
for i in xrange(m):
vals.append(i)
keys.append('a%s'%i)
d = None
for i in xrange(n):
d = dict(zip(keys, vals))
return d
pool = ThreadPool(20)
start = time.time()
for _ in xrange(10):
pool.spawn(test, 1000000,100)
gevent.wait()
delay = time.time() - start
print 'Running "time.sleep(1)" 4 times with 3 threads. Should take about 2 seconds: %.3fs' % delay
下面是Threadpool 的部分源代码,set_size是创建进程的数目,on_fork是fork进程,kill是干掉进程,spawn是触发进程,add_thread是增加进程。
def _set_size(self, size):
if size < 0:
raise ValueError('Size of the pool cannot be negative: %r' % (size, ))
if size > self._maxsize:
raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
if self.manager:
self.manager.kill()
while self._size < size:
self._add_thread()
delay = 0.0001
while self._size > size:
while self._size - size > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
if getcurrent() is self.hub:
break
sleep(delay)
delay = min(delay * 2, .05)
if self._size:
self.fork_watcher.start(self._on_fork)
else:
self.fork_watcher.stop()
size = property(_get_size, _set_size)
def _init(self, maxsize):
self._size = 0
self._semaphore = Semaphore(1)
self._lock = Lock()
self.task_queue = Queue()
self._set_maxsize(maxsize)
def _on_fork(self):
# fork() only leaves one thread; also screws up locks;
# let's re-create locks and threads
pid = os.getpid()
if pid != self.pid:
self.pid = pid
# Do not mix fork() and threads; since fork() only copies one thread
# all objects referenced by other threads has refcount that will never
# go down to 0.
self._init(self._maxsize)
def join(self):
delay = 0.0005
while self.task_queue.unfinished_tasks > 0:
sleep(delay)
delay = min(delay * 2, .05)
def kill(self):
self.size = 0
def _adjust_step(self):
# if there is a possibility & necessity for adding a thread, do it
while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
self._add_thread()
# while the number of threads is more than maxsize, kill one
# we do not check what's already in task_queue - it could be all Nones
while self._size - self._maxsize > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
if self._size:
self.fork_watcher.start(self._on_fork)
else:
self.fork_watcher.stop()
def _adjust_wait(self):
delay = 0.0001
while True:
self._adjust_step()
if self._size <= self._maxsize:
return
sleep(delay)
delay = min(delay * 2, .05)
def adjust(self):
self._adjust_step()
if not self.manager and self._size > self._maxsize:
# might need to feed more Nones into the pool
self.manager = Greenlet.spawn(self._adjust_wait)
def _add_thread(self):
with self._lock:
self._size += 1
try:
start_new_thread(self._worker, ())
except:
with self._lock:
self._size -= 1
raise
def spawn(self, func, *args, **kwargs):
while True:
semaphore = self._semaphore
semaphore.acquire()
if semaphore is self._semaphore:
break
try:
task_queue = self.task_queue
result = AsyncResult()
thread_result = ThreadResult(result, hub=self.hub)
task_queue.put((func, args, kwargs, thread_result))
self.adjust()
# rawlink() must be the last call
result.rawlink(lambda *args: self._semaphore.release())
# XXX this _semaphore.release() is competing for order with get()
# XXX this is not good, just make ThreadResult release the semaphore before doing anything else
except:
semaphore.release()
raise
return result

gevent threadpool 对CPU BOUND 没有用吧 ? 可以用到多核 ?
他的实现就是os.fork子进程的方式,也就是说可以用多核的。 协程大多数是适用于IO Bound的场景。
用过这个,只是有些不伦不类