浅谈twisted应用中异步回调的方式及线程的应用

前言:

     简单学习了golang的goruntine后,再回过头来看twisted网络编程库中的异步应用。  公司以前的分布式爬虫调度是用Twisted写的,真是没招,只能先硬着头皮上。对于twisted以前看过一些,测试性质的写了一个小应用,是一个负载分发的模块,可以想成是iptables nat那样的端口映射。

原文地址,blog.xiaorui.cc

说正题:先介绍下twisted的几个的概念。


twisted有个defer的概念说来golang也有个defer,只是golang的是和panic recover配合做异常捕获的。 twisted的defer是异步的一种变现方式,可以这么理解,他和thread的区别是,他是基于时间event的。

有了deferred,即可对任务的执行进行管理控制。防止程序的运行,由于等待某项任务的完成而陷入阻塞停滞,提高整体运行的效率。


Deferred能帮助你编写异步代码,但并不是为自动生成异步或无阻塞的代码!要想将一个同步函数编程异步函数,必须在函数中返回Deferred并正确注册回调。


事件的好处,有堵塞io的操作的时候,我把这个任务扔到后面执行,当io执行好了后,我再继续计算刚才那个事 .

callback链表有三个形式,正常结束,例外结束,任意状态,他们分别调用会调用 addCallback\addErrback\addBoth注册到链表中。 特意说下,addBoth是任何的状态也都会执行


这里再说下,twsited的线程。reactor.callFromThread 是由reactor.run 搞出来的,所以你做好状态用reactor.stop可以控制他的关闭,就是因为他是由reactor.run派出来的进程,所以会堵塞主任务线程的,然而reactor.callInThread是一个个的独立的线程,他不堵塞了,但是他也无法stop了。。。。

#coding=utf-8
from twisted.internet import reactor
#xiaorui.cc
import time
reactor.suggestThreadPoolSize(30)
def tt(i,j):
    if i =="10":
        reactor.stop()
    while 1:
        print i,'---------------',j
        time.sleep(2)
def gg(i,j):
    time.sleep(2)
    if i ==10:
        reactor.stop()
    print i,'---------------',j
    time.sleep(2)
for i in range(50):
#    reactor.callFromThread(gg,i,i)
    reactor.callInThread(tt,i,i)
print "I want to start"
reactor.run()


twisted自带了一个顺序执行的组件叫做 callLater, 我们可以规定第一秒执行这个函数,第二秒执行那个函数,第三秒的时候关闭realor ! 干脆点,你可以想成是计划任务。

from twisted.internet import reactor
import time
def printTime():
    print 'Current time is',time.strftime("%H:%M:%S")
def stopReactor():
    print "Stopping reactor"
    reactor.stop()
reactor.callLater(1,printTime)
reactor.callLater(2,printTime)
reactor.callLater(3,printTime)
reactor.callLater(4,printTime)
reactor.callLater(5,stopReactor)
print 'Running the reactor ...'
reactor.run()
print 'Reactor stopped.'

下面的例子,你们自己跑跑,我上面说的都是一些个零散的例子,大家对照下面完整的,走一遍。 twisted理解其实却是有点麻烦,大家只要知道他是基于事件的后,慢慢理解就行了。

#coding:utf-8
#xiaorui.cc
from twisted.internet import reactor, defer
from twisted.internet.threads import deferToThread
import os,sys
from twisted.python import threadable; threadable.init(1)
deferred =deferToThread.__get__
import time
def todoprint_(result):
    print result
def running():
    "Prints a few dots on stdout while the reactor is running."
#     sys.stdout.write("."); sys.stdout.flush()
    print '.'
    reactor.callLater(.1, running)
@deferred
def sleep(sec):
    "A blocking function magically converted in a non-blocking one."
    print 'start sleep %s'%sec
    time.sleep(sec)
    print '\nend sleep %s'%sec
    return "ok"
def test(n,m):
    print "fun test()  is  start"
    m=m
    vals = []
    keys = []
    for i in xrange(m):
        vals.append(i)
        keys.append('a%s'%i)
    d = None
    for i in xrange(n):
        d = dict(zip(keys, vals))
    print "fun test() is end"
    return d
if __name__== "__main__":
#one
    sleep(10).addBoth(todoprint_)
    reactor.callLater(.1, running)
    reactor.callLater(3, reactor.stop)
    print "go go !!!"
    reactor.run()
#two
    aa=time.time()
    de = defer.Deferred()
    de.addCallback(test)
    reactor.callInThread(de.callback,10000000,100 )
    print time.time()-aa
    print "我这里先做别的事情"
    print de
    print "go go end"



大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

8 Responses

  1. Darth 2015年12月25日 / 下午12:59

    哈哈哈 不是我来打广告, 我自己搞了一个项目 灰度发布分流引擎 https://github.com/boylegu/regal 博主如有兴趣可以参考支持一下 互相进行交流

  2. 问个问题 2015年10月29日 / 上午7:23

    现在twisted越来越少了吧

  3. ggarlic 2015年1月27日 / 上午9:55

    加上inlinecallback你也能用同步的方式写异步代码

    • 峰云就她了 2015年1月27日 / 上午10:12

      tw有段时间没搞了。。。。 用inlineCallbacks也不错,用一个装饰器把一个生成器变成一系列的异步的callbacks. 当时我搞得时候,有时会出现yield异常,有时间再折腾下

  4. 小英雄 2014年12月4日 / 上午7:59

    学习了

发表评论

邮箱地址不会被公开。 必填项已用*标注