前言:
简单学习了golang的goruntine后,再回过头来看twisted网络编程库中的异步应用。 公司以前的分布式爬虫调度是用Twisted写的,真是没招,只能先硬着头皮上。对于twisted以前看过一些,测试性质的写了一个小应用,是一个负载分发的模块,可以想成是iptables nat那样的端口映射。
原文地址,blog.xiaorui.cc
说正题:先介绍下twisted的几个的概念。
twisted有个defer的概念,说来golang也有个defer,只是golang的是和panic recover配合做异常捕获的。 twisted的defer是异步的一种变现方式,可以这么理解,他和thread的区别是,他是基于时间event的。
有了deferred,即可对任务的执行进行管理控制。防止程序的运行,由于等待某项任务的完成而陷入阻塞停滞,提高整体运行的效率。
Deferred能帮助你编写异步代码,但并不是为自动生成异步或无阻塞的代码!要想将一个同步函数编程异步函数,必须在函数中返回Deferred并正确注册回调。
事件的好处,有堵塞io的操作的时候,我把这个任务扔到后面执行,当io执行好了后,我再继续计算刚才那个事 .
callback链表有三个形式,正常结束,例外结束,任意状态,他们分别调用会调用 addCallback\addErrback\addBoth注册到链表中。 特意说下,addBoth是任何的状态也都会执行
这里再说下,twsited的线程。reactor.callFromThread 是由reactor.run 搞出来的,所以你做好状态用reactor.stop可以控制他的关闭,就是因为他是由reactor.run派出来的进程,所以会堵塞主任务线程的,然而reactor.callInThread是一个个的独立的线程,他不堵塞了,但是他也无法stop了。。。。
#coding=utf-8 from twisted.internet import reactor #xiaorui.cc import time reactor.suggestThreadPoolSize(30) def tt(i,j): if i =="10": reactor.stop() while 1: print i,'---------------',j time.sleep(2) def gg(i,j): time.sleep(2) if i ==10: reactor.stop() print i,'---------------',j time.sleep(2) for i in range(50): # reactor.callFromThread(gg,i,i) reactor.callInThread(tt,i,i) print "I want to start" reactor.run()
twisted自带了一个顺序执行的组件叫做 callLater, 我们可以规定第一秒执行这个函数,第二秒执行那个函数,第三秒的时候关闭realor ! 干脆点,你可以想成是计划任务。
from twisted.internet import reactor import time def printTime(): print 'Current time is',time.strftime("%H:%M:%S") def stopReactor(): print "Stopping reactor" reactor.stop() reactor.callLater(1,printTime) reactor.callLater(2,printTime) reactor.callLater(3,printTime) reactor.callLater(4,printTime) reactor.callLater(5,stopReactor) print 'Running the reactor ...' reactor.run() print 'Reactor stopped.'
下面的例子,你们自己跑跑,我上面说的都是一些个零散的例子,大家对照下面完整的,走一遍。 twisted理解其实却是有点麻烦,大家只要知道他是基于事件的后,慢慢理解就行了。
#coding:utf-8 #xiaorui.cc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread import os,sys from twisted.python import threadable; threadable.init(1) deferred =deferToThread.__get__ import time def todoprint_(result): print result def running(): "Prints a few dots on stdout while the reactor is running." # sys.stdout.write("."); sys.stdout.flush() print '.' reactor.callLater(.1, running) @deferred def sleep(sec): "A blocking function magically converted in a non-blocking one." print 'start sleep %s'%sec time.sleep(sec) print '\nend sleep %s'%sec return "ok" def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return d if __name__== "__main__": #one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run() #two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print "我这里先做别的事情" print de print "go go end"
有没有Twisted-influxDB的异步驱动?
哈哈哈 不是我来打广告, 我自己搞了一个项目 灰度发布分流引擎 https://github.com/boylegu/regal 博主如有兴趣可以参考支持一下 互相进行交流
现在twisted越来越少了吧
加上inlinecallback你也能用同步的方式写异步代码
tw有段时间没搞了。。。。 用inlineCallbacks也不错,用一个装饰器把一个生成器变成一系列的异步的callbacks. 当时我搞得时候,有时会出现yield异常,有时间再折腾下
学习了