前言:
Golang nats mq是一个基于golang实现的高性能消息队列,nats也被cncf纳入到云原生计算基金会。据我所知,nats mq貌似在gopher里很有名气,但别的语言系的朋友不关注这个。
我用nats也有段时间了,你问我这个nats好用否? 我只能说比我以前用的go nsq性能高且可用性强,加入stream后,可信赖数据可靠性。如果再详细的介绍,大家就看nats的官网文档吧。
该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5642
我们的nats应用场景
先说下我们的应用场景,我们除了用传统意义上的mq功能外,就会借助nats来实现消息总线。这里的消息总线模式更像是微服务的另一种模型。常见的微服务模式是服务提供者把自己的信息注册到 zk/ etcd/consul等,然后服务调用者通过服务发现机制找到访问对象的地址,然后直接访问,或者是通过mesh机制代理访问。
而基于消息总线的微服务设计是怎样?服务提供者监听nats mq topic,服务调用者直接把请求扔到mq topic,然后接收返回的结果? 为什么是疑问? nats大体是可以分为三个模式,跟大多mq一样的pubsub模式,还有queue模式,跟其他mq不一样的request reply模式。 mq一般可以理解为异步消费,也就是说producer只需要把任务丢到mq就完事了,但是如果你要实现同步,那么producer和provider两端需要定义好 任务信息topic和结果topic。然后producer pub任务后,需要再sub订阅结果topic。
麻烦不? 如果整条微服务体现的所有服务都这么搞,太蛋疼了。 所以nats mq为微服务专门搞了一个request & reply。简单说,就是为了结果上面说的同步问题,对于producer来说只是一个方法。你会发现github社区里有不少微服务套件都有消息总线模式的实现。
看字看不明白,看图应该能看懂吧?
nats request & reply 实现
request & reply的实现基本是在nats client端实现的。nats server对request & reply没做啥特别的东西。我们这里拿golang的nats client来分析下源码。go nats client的协程控制的很好,没有随意的滥开协程。subscribe自身没有并发控制,subscribe绑定了一个事件和回调方法后,会new一个waitForMsgs协程来回调,也就是说单个subscribe是同步阻塞的。 这个需要注意下,在go里nsq和kafka的sarama是模式并发模式的。
nats client连接的过程
一个nats client的连接会开启2个协程,一个是用来读取数据,一个用来flush数据。
// Process a connected connection and initialize properly. func (nc *Conn) processConnectInit() error { nc.wg.Add(2) go nc.readLoop() go nc.flusher() return nil }
// xiaorui.cc func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) { // Create literal Inbox and map to a chan msg. mch := make(chan *Msg, RequestChanLen) respInbox := nc.newRespInbox() token := respToken(respInbox) nc.respMap[token] = mch createSub := nc.respMux == nil ginbox := nc.respSub nc.mu.Unlock() if createSub { var err error // 通过sync.Once保证request模式下,waitForMsg只有一个协程。作用是订阅返回结果的topic及回调mch nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) }) if err != nil { return nil, err } } if err := nc.PublishRequest(subj, respInbox, data); err != nil { return nil, err } t := globalTimerPool.Get(timeout) defer globalTimerPool.Put(t) var ok bool var msg *Msg select { // 回调的channel case msg, ok = <-mch: if !ok { return nil, ErrConnectionClosed } case <-t.C: nc.mu.Lock() delete(nc.respMap, token) nc.mu.Unlock() return nil, ErrTimeout } return msg, nil }
nats reply订阅接收的过程
订阅消费者返回结果的topic,把获取的结果扔到request等待的channel里。订阅者去直接调用subscribe事件的时候,每个subscribe的调用都会new一个waitForMsg协程来回调我们注册的方法。
简单说,同一个连接下,多个sub就多个并发。
// xiaorui.cc func (nc *Conn) createRespMux(respSub string) error { s, err := nc.Subscribe(respSub, nc.respHandler) if err != nil { return err } nc.mu.Lock() nc.respMux = s nc.mu.Unlock() return nil } func (nc *Conn) respHandler(m *Msg) { rt := respToken(m.Subject) nc.mu.Lock() // Just return if closed. if nc.isClosed() { nc.mu.Unlock() return } // Grab mch mch := nc.respMap[rt] ... select { case mch <- m: default: return } } func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) { ... if cb != nil { ... go nc.waitForMsgs(sub) } } func (nc *Conn) waitForMsgs(s *Subscription) { var closed bool var delivered, max uint64 // Used to account for adjustments to sub.pBytes when we wrap back around. msgLen := -1 for { ... if s.pHead == nil && !s.closed { s.pCond.Wait() } // Pop the msg off the list m := s.pHead if m != nil { ... } mcb := s.mcb max = s.max ... // 投递消息, 其实就是调用nc.respHandler if m != nil && (max == 0 || delivered <= max) { mcb(m) } ... } // Check for barrier messages s.mu.Lock() for m := s.pHead; m != nil; m = s.pHead { if m.barrier != nil { s.mu.Unlock() if atomic.AddInt64(&m.barrier.refs, -1) == 0 { m.barrier.f() } s.mu.Lock() } s.pHead = m.next } s.mu.Unlock() }
总结:
其实nats request& reply的原理很简单,另外golang nats的源码质量还可以,建议大家可以看下。