小探python thrift通信框架的设计

上下文环境

先废话连篇,老生常谈说下Thrift 是什么?  他是个功能强大的通信协议组件,附带了各级socket服务,自成RPC服务.   我们知道大多数所谓的RPC远程调用服务都是基于http来开发的,为毛?  因为够简单呀。  对于server端 和 客户端来说,我只需要把函数名及参数序列化扔到对端就可以了。 


那么http和tcp封装rpc最大的区别在于什么?

开发难度, 使用直接tcp server需要考虑粘包的问题,还要自定义通信协议。 HTTP明显开发简单的多,高性能的框架选择余地也贼多,只是http协议在RPC场景下显得有些冗余了。 当然你可以自己裁剪下http协议.  

工作到现在以来,我自己开发的高性能rpc就有两个,也写过rpc的文章。 我这么喜欢造轮子的人,为什么会选用通用的thrift做协议层.   这次要重构一个调度模块,因为不仅仅是python来访问你,还有其他语言来调用,所以这个时候就不好再用自己造的轮子了,  难不成我还要画心思告诉别人RPC的客户端怎么写?协议各方面需要注意什么? 算了吧, 我还是用 thrift协议来兼容各个语言的通信吧。 

我这边其实有用过thrift的服务,比如 Hbase的thrift server,监控系统。 说实话这性能,我始终不是很满意,我们每天也就几千万的量,虽然每条的数据在200KB左右。 其实归根到底不是Thrift不行,而是他默认的几个IO服务模型不给力。  像 Hbase Thrfit Server使用的就是BIO阻塞模式,只能开更多的线程来接客 ….   

我曾经测试过thrift的nio模型,nio是java的New io,   最后的结果还是可以的,单机可以抗住 3W 的请求量 .     不管怎么说,正好也没有写过thrift的文章,  趁着有点时间,跟大家聊聊。 


该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.

http://xiaorui.cc/?p=3606

Thrift有简洁的四层接口抽象,每一层都可以独立的扩展增强或替换.  如果你有定制的需求,可以直接在现存的thrift框架上修改.  比如我很想把thrift的server改成multiprocessing gevent模式. 

下图是thrift的结构图:



我们先说下thrift的通信协议,有下面这么几种.

#xiaorui.cc

TBinaryProtocol                 二进制传输协议
TCompactProtocol                使用VLQ编码进行压缩的数据传输协议
TJSONProtocol                   JSON格式的数据传输协议
TSimpleJSONProtocol             简单的JSON格式数据传输协议
TDebugProtocol                  调试时使用的文本传输协议

 
再来看看thrift的通信方式. 

TFramedTransport                   按块的大小进行传输
TFileTransport                     按照文件的方式进行传输
TMemoryTransport                   使用内存IO方式进行传输
TZlibTransport                     执行zlib压缩方式传输

我们这次不讲thrift的使用方法,有需要的朋友直接看官方的实例,这次主要通过看Thrift的源码学习thrift的设计的一些事.   主要是python的thrift源码.

看看python thrift server 是怎么一回事?  

thrift server默认的模式是单线程堵塞的模型, TThreadPoolServer 是线程池的模型。 在python环境下大家还是选用TProcessPoolServer多进程版本。毕竟gil限制了并行,同时只能跑一个任务, 这里就不多扯在python下线程 vs 进程的选择了.

TServer.TSimpleServer
单进程单线程...

TServer.TThreadedServer
每个请求独立的线程

TServer.TThreadPoolServer
线程池模型

TServer.TForkingServer
每个请求独立的进程

TServer.TProcessPoolServer
进程池方案


下面是thrift的进程池的模式代码,实现的原理倒是很简单,就是fork了几个子进程,子进程都针对 listen fd进行accept .

#xiaorui.cc

   def serve(self):
        """Start a fixed number of worker threads and put client into a queue"""

        #this is a shared state that can tell the workers to exit when set as false
        self.isRunning.value = True

        #first bind and listen to the port
        self.serverTransport.listen()

        #fork the children
        for i in range(self.numWorkers):
            try:
                w = Process(target=self.workerProcess)
                w.daemon = True
                w.start()
                self.workers.append(w)
            except Exception, x:
                logging.exception(x)

        #wait until the condition is set by stop()

        while True:

            self.stopCondition.acquire()
            try:
                self.stopCondition.wait()
                break
            except (SystemExit, KeyboardInterrupt):
        break
            except Exception, x:
                logging.exception(x)

        self.isRunning.value = False


    def workerProcess(self):
        """Loop around getting clients from the shared queue and process them."""

        if self.postForkCallback:
            self.postForkCallback()

        while self.isRunning.value == True:
            try:
                client = self.serverTransport.accept()
                self.serveClient(client)
            except (KeyboardInterrupt, SystemExit):
                return 0
            except Exception, x:

再来看看thrift怎么实现函数映射的.   这个是个黑魔法,比如你有个类实例对象 a , 然后你想 a.ping() ,但是他没有 ping这个函数,直接调用会出现异常。 不想异常的调用有两种方法,第一个,你写个函数,第二个是你可以使用python的魔法函数来接受ping这个字符串,然后协议封装后,扔给thrfit服务端.  

#xiaorui.cc

class TProtocolDecorator():
  def __init__(self, protocol):
    TProtocolBase(protocol)
    self.protocol = protocol

  def __getattr__(self, name):
    if hasattr(self.protocol, name):
      member = getattr(self.protocol, name)
      if type(member) in [MethodType, UnboundMethodType, FunctionType, LambdaType, BuiltinFunctionType, BuiltinMethodType]:
        return lambda *args, **kwargs: self._wrap(member, args, kwargs)
      else:
        return member
    raise AttributeError(name)

再就是TCP服务的粘包问题

粘包本身就是个伪命题,因为他就是个数据边界的问题,你只要把边界定义清晰,就不会出现tcp粘包问题.  

首先客户端把数据封装成帧的形式,即“数据长度+数据内容”,然后将处理之后的数据通过网络发送给Thrift服务器;此处也需要注意:要与Thrift服务器程序所采用的传输层的实现类一致,否则Thrift的传输层也无法将数据进行逆向的处理;

每Frame的前4字节会记录Frame的长度(少于16M)。读的时候按长度预先将整Frame数据读入Buffer,再从Buffer慢慢读取。写的时候,每次flush将Buffer中的所有数据写成一个Frame。
有长度信息的TFramedTransport是后面NonBlockingServer粘包拆包的基础。

Thrift vs Protocol Buffers  区别是什么? 

Protocol Buffers 更适合做序列化,反序列化,像是 Msgpack那样,但是Thrift可以说是一个合集组件,他的传输可以是tcp,http,他的协议可以是http,json,二进制,服务器类型可以有多线程,多进程等.  


话说回来, 任何rpc框架的实现都要做下面这些事.

1.   基本的网络通信,http,socket,异步框架tornado,netty等。

2.   通信数据的序列化与反序列化, json, msgpack 

3.   程序内部的反射自省模式.



END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注