前言:
Golang nats mq是一个基于golang实现的高性能消息队列,nats也被cncf纳入到云原生计算基金会。据我所知,nats mq貌似在gopher里很有名气,但别的语言系的朋友不关注这个。
我用nats也有段时间了,你问我这个nats好用否? 我只能说比我以前用的go nsq性能高且可用性强,加入stream后,可信赖数据可靠性。如果再详细的介绍,大家就看nats的官网文档吧。
该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5642
我们的nats应用场景
先说下我们的应用场景,我们除了用传统意义上的mq功能外,就会借助nats来实现消息总线。这里的消息总线模式更像是微服务的另一种模型。常见的微服务模式是服务提供者把自己的信息注册到 zk/ etcd/consul等,然后服务调用者通过服务发现机制找到访问对象的地址,然后直接访问,或者是通过mesh机制代理访问。
而基于消息总线的微服务设计是怎样?服务提供者监听nats mq topic,服务调用者直接把请求扔到mq topic,然后接收返回的结果? 为什么是疑问? nats大体是可以分为三个模式,跟大多mq一样的pubsub模式,还有queue模式,跟其他mq不一样的request reply模式。 mq一般可以理解为异步消费,也就是说producer只需要把任务丢到mq就完事了,但是如果你要实现同步,那么producer和provider两端需要定义好 任务信息topic和结果topic。然后producer pub任务后,需要再sub订阅结果topic。
麻烦不? 如果整条微服务体现的所有服务都这么搞,太蛋疼了。 所以nats mq为微服务专门搞了一个request & reply。简单说,就是为了结果上面说的同步问题,对于producer来说只是一个方法。你会发现github社区里有不少微服务套件都有消息总线模式的实现。
看字看不明白,看图应该能看懂吧?

nats request & reply 实现
request & reply的实现基本是在nats client端实现的。nats server对request & reply没做啥特别的东西。我们这里拿golang的nats client来分析下源码。go nats client的协程控制的很好,没有随意的滥开协程。subscribe自身没有并发控制,subscribe绑定了一个事件和回调方法后,会new一个waitForMsgs协程来回调,也就是说单个subscribe是同步阻塞的。 这个需要注意下,在go里nsq和kafka的sarama是模式并发模式的。
nats client连接的过程
一个nats client的连接会开启2个协程,一个是用来读取数据,一个用来flush数据。
// Process a connected connection and initialize properly.
func (nc *Conn) processConnectInit() error {
nc.wg.Add(2)
go nc.readLoop()
go nc.flusher()
return nil
}
// xiaorui.cc
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
// Create literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respToken(respInbox)
nc.respMap[token] = mch
createSub := nc.respMux == nil
ginbox := nc.respSub
nc.mu.Unlock()
if createSub {
var err error
// 通过sync.Once保证request模式下,waitForMsg只有一个协程。作用是订阅返回结果的topic及回调mch
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
if err != nil {
return nil, err
}
}
if err := nc.PublishRequest(subj, respInbox, data); err != nil {
return nil, err
}
t := globalTimerPool.Get(timeout)
defer globalTimerPool.Put(t)
var ok bool
var msg *Msg
select {
// 回调的channel
case msg, ok = <-mch:
if !ok {
return nil, ErrConnectionClosed
}
case <-t.C:
nc.mu.Lock()
delete(nc.respMap, token)
nc.mu.Unlock()
return nil, ErrTimeout
}
return msg, nil
}
nats reply订阅接收的过程
订阅消费者返回结果的topic,把获取的结果扔到request等待的channel里。订阅者去直接调用subscribe事件的时候,每个subscribe的调用都会new一个waitForMsg协程来回调我们注册的方法。
简单说,同一个连接下,多个sub就多个并发。
// xiaorui.cc
func (nc *Conn) createRespMux(respSub string) error {
s, err := nc.Subscribe(respSub, nc.respHandler)
if err != nil {
return err
}
nc.mu.Lock()
nc.respMux = s
nc.mu.Unlock()
return nil
}
func (nc *Conn) respHandler(m *Msg) {
rt := respToken(m.Subject)
nc.mu.Lock()
// Just return if closed.
if nc.isClosed() {
nc.mu.Unlock()
return
}
// Grab mch
mch := nc.respMap[rt]
...
select {
case mch <- m:
default:
return
}
}
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) {
...
if cb != nil {
...
go nc.waitForMsgs(sub)
}
}
func (nc *Conn) waitForMsgs(s *Subscription) {
var closed bool
var delivered, max uint64
// Used to account for adjustments to sub.pBytes when we wrap back around.
msgLen := -1
for {
...
if s.pHead == nil && !s.closed {
s.pCond.Wait()
}
// Pop the msg off the list
m := s.pHead
if m != nil {
...
}
mcb := s.mcb
max = s.max
...
// 投递消息, 其实就是调用nc.respHandler
if m != nil && (max == 0 || delivered <= max) {
mcb(m)
}
...
}
// Check for barrier messages
s.mu.Lock()
for m := s.pHead; m != nil; m = s.pHead {
if m.barrier != nil {
s.mu.Unlock()
if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
m.barrier.f()
}
s.mu.Lock()
}
s.pHead = m.next
}
s.mu.Unlock()
}
总结:
其实nats request& reply的原理很简单,另外golang nats的源码质量还可以,建议大家可以看下。
