源码分析nats request reply的设计实现

前言:

     Golang nats mq是一个基于golang实现的高性能消息队列,nats也被cncf纳入到云原生计算基金会。据我所知,nats mq貌似在gopher里很有名气,但别的语言系的朋友不关注这个。

     我用nats也有段时间了,你问我这个nats好用否? 我只能说比我以前用的go nsq性能高且可用性强,加入stream后,可信赖数据可靠性。如果再详细的介绍,大家就看nats的官网文档吧。

     该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5642

我们的nats应用场景

      先说下我们的应用场景,我们除了用传统意义上的mq功能外,就会借助nats来实现消息总线。这里的消息总线模式更像是微服务的另一种模型。常见的微服务模式是服务提供者把自己的信息注册到 zk/ etcd/consul等,然后服务调用者通过服务发现机制找到访问对象的地址,然后直接访问,或者是通过mesh机制代理访问。

     而基于消息总线的微服务设计是怎样?服务提供者监听nats mq topic,服务调用者直接把请求扔到mq topic,然后接收返回的结果? 为什么是疑问? nats大体是可以分为三个模式,跟大多mq一样的pubsub模式,还有queue模式,跟其他mq不一样的request reply模式。  mq一般可以理解为异步消费,也就是说producer只需要把任务丢到mq就完事了,但是如果你要实现同步,那么producer和provider两端需要定义好 任务信息topic和结果topic。然后producer pub任务后,需要再sub订阅结果topic。

     麻烦不? 如果整条微服务体现的所有服务都这么搞,太蛋疼了。 所以nats mq为微服务专门搞了一个request & reply。简单说,就是为了结果上面说的同步问题,对于producer来说只是一个方法。你会发现github社区里有不少微服务套件都有消息总线模式的实现。

     看字看不明白,看图应该能看懂吧? 


 

nats request & reply 实现

     request & reply的实现基本是在nats client端实现的。nats server对request & reply没做啥特别的东西。我们这里拿golang的nats client来分析下源码。go nats client的协程控制的很好,没有随意的滥开协程。subscribe自身没有并发控制,subscribe绑定了一个事件和回调方法后,会new一个waitForMsgs协程来回调,也就是说单个subscribe是同步阻塞的。 这个需要注意下,在go里nsq和kafka的sarama是模式并发模式的。

 

     nats client连接的过程

     一个nats client的连接会开启2个协程,一个是用来读取数据,一个用来flush数据。 

// Process a connected connection and initialize properly.
func (nc *Conn) processConnectInit() error {
    nc.wg.Add(2)
    go nc.readLoop()
    go nc.flusher()
    return nil
}
    nats request发送的过程

针对request模式,一个连接只有一个waitForMsgs协程,nats通过sync.Once限制唯一。这里waitForMsgs的作用只是把msg发给request等待的chan而已。当我们使用bus的request方法时,他内部会生成一个接收结果的chan,接着在绑定Subscribe里绑定事件, 然后阻塞等待接收结果的chan。当远端的订阅者把结果返回时, waitForMsg会把msg传给上面的chan。 request接收chan,结束阻塞状态,返回。 
// xiaorui.cc

func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
    // Create literal Inbox and map to a chan msg.
    mch := make(chan *Msg, RequestChanLen)
    respInbox := nc.newRespInbox()
    token := respToken(respInbox)
    nc.respMap[token] = mch
    createSub := nc.respMux == nil
    ginbox := nc.respSub
    nc.mu.Unlock()
    if createSub {
        var err error
        //  通过sync.Once保证request模式下,waitForMsg只有一个协程。作用是订阅返回结果的topic及回调mch
        nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
        if err != nil {
            return nil, err
        }
    }
    if err := nc.PublishRequest(subj, respInbox, data); err != nil {
        return nil, err
    }
    t := globalTimerPool.Get(timeout)
    defer globalTimerPool.Put(t)
    var ok bool
    var msg *Msg
    select {
    // 回调的channel
    case msg, ok = <-mch:
        if !ok {
            return nil, ErrConnectionClosed
        }
    case <-t.C:
        nc.mu.Lock()
        delete(nc.respMap, token)
        nc.mu.Unlock()
        return nil, ErrTimeout
    }
    return msg, nil
}

    nats reply订阅接收的过程

订阅消费者返回结果的topic,把获取的结果扔到request等待的channel里。订阅者去直接调用subscribe事件的时候,每个subscribe的调用都会new一个waitForMsg协程来回调我们注册的方法。

简单说,同一个连接下,多个sub就多个并发。 

// xiaorui.cc
func (nc *Conn) createRespMux(respSub string) error {
    s, err := nc.Subscribe(respSub, nc.respHandler)
    if err != nil {
        return err
    }
    nc.mu.Lock()
    nc.respMux = s
    nc.mu.Unlock()
    return nil
}
func (nc *Conn) respHandler(m *Msg) {
    rt := respToken(m.Subject)
    nc.mu.Lock()
    // Just return if closed.
    if nc.isClosed() {
        nc.mu.Unlock()
        return
    }
    // Grab mch
    mch := nc.respMap[rt]
   ...
    select {
    case mch <- m:
    default:
        return
    }
}
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) {
   ...
    if cb != nil {
       ...
        go nc.waitForMsgs(sub)
   }
}
func (nc *Conn) waitForMsgs(s *Subscription) {
    var closed bool
    var delivered, max uint64
    // Used to account for adjustments to sub.pBytes when we wrap back around.
    msgLen := -1
    for {
        ...
        if s.pHead == nil && !s.closed {
            s.pCond.Wait()
        }
        // Pop the msg off the list
        m := s.pHead
        if m != nil {
            ...
        }
        mcb := s.mcb
        max = s.max
        ...
        // 投递消息, 其实就是调用nc.respHandler
        if m != nil && (max == 0 || delivered <= max) {
            mcb(m)
        }
        ...
    }
    // Check for barrier messages
    s.mu.Lock()
    for m := s.pHead; m != nil; m = s.pHead {
        if m.barrier != nil {
            s.mu.Unlock()
            if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
                m.barrier.f()
            }
            s.mu.Lock()
        }
        s.pHead = m.next
    }
    s.mu.Unlock()
}

总结:

       其实nats request& reply的原理很简单,另外golang nats的源码质量还可以,建议大家可以看下。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc