源码分析golang consul分布式锁lock delay问题

前言:

这几年下来搞了好几种分布式锁了,比如最开始的redis,后来升级到redis redlock红锁, 后来到zookeeper和etcd,再到现在的 consul cluster。 为什么使用consul做分布式锁,是他比其他的nosql有更好的优点么? 或者其他的组件有坑?

该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5849

都不是… 因为公司就只有consul集群,没有redis和etcd集群。

实现逻辑:

consul实现分布式锁的方法很简单,consul官方也给出了文档和案例,简单说你实例化一个session会话,然后使用该会话去加锁,然后放锁。 我们可以在session里加入 TTL 超时的控制。

官方给出的例子是使用curl http api来构建的分布式锁,我这边用golang consul的lib来实现的锁,代码已经放到github了,有兴趣的朋友可以看看。 https://github.com/rfyiamcool/go-consul-locker

我在go-consul-locker里实现了两种分布式锁的模式,一种是基于事件通知的,简单说,就是定了一组channel做锁的通知。另一种是直接调用的模式。推荐使用直接调用的模式,下面给出call mode的使用例子。

// xiaorui.cc

package main

import (
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/rfyiamcool/go-consul-locker"
	"github.com/rfyiamcool/go-consul-locker/example/common"
)

func main() {
	var (
		d       *consulocker.DisLocker
		err     error
		term    = make(chan os.Signal)
		running = true
		wg      = sync.WaitGroup{}
	)

	signal.Notify(term, os.Interrupt, syscall.SIGTERM)

	mcron := common.NewMCron()
	d, err = consulocker.New(
		&consulocker.Config{
			Address:      "127.0.0.1:8500",
			KeyName:      "lock/add_user",
			LockWaitTime: 5 * time.Second,
		},
	)
	if err != nil {
		log.Println("Error ", err)
		return
	}

	value := map[string]string{
		"server_id": common.MakeServerId(),
	}

	wg.Add(1)
	go func() {
		defer wg.Done()

		select {
		case <-term:
			running = false
			if !d.IsLocked {
				return
			}

			if err := d.ReleaseLock(); err != nil {
				log.Println(err)
			}
			log.Println("signal release lock ok")

			mcron.Stop()
			log.Println("Exiting gracefully...")
			return
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()

		var c = 0
		for running {
			isLocked, err := d.TryLockAcquire(value)
			if !running {
				return
			}

			if err != nil || isLocked == false {
				log.Printf("can't acquire lock, sleep 1s, err: %v, isLocked: %v\n", err, isLocked)
				time.Sleep(1 * time.Second)
				continue
			}

			log.Println("acquire lock ok")
			mcron.Start()

			for running {
				d.Renew()
				time.Sleep(1 * time.Second)

				c++
				if c < 10 {
					continue
				}

				// reset
				c = 0

				// stop cron task
				mcron.Stop()

				// release lock
				if err := d.ReleaseLock(); err != nil {
					log.Println(err)
				}
				log.Println("----")
				log.Println("active release lock ok; sleep 3s")
				log.Println("----")

				time.Sleep(3 * time.Second)
				break
			}
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()

		time.Sleep(2 * time.Second)
		for running {
			log.Printf("isLocked state is %v", d.IsLocked)
			time.Sleep(2 * time.Second)
		}
	}()

	wg.Wait()
	log.Println("exit")
}

遇到的问题:

实现的过程基本很通顺,consul的源码实现相对比etcd更干净简洁,api的设计也让人易懂。 但consul有个lock delay需要注意一下。 在使用etcd和redis redlock实现分布式锁的时候,一个节点释放锁,另一个节点可以立马拿到锁? 就算有延迟也只是网络上的调用开销。

但是consul的lock delay策略不是这样的,一个节点释放锁了,另一个节点不能立马拿到锁。需要等待lock delay时间后才可以拿到锁。

google的chubby组件也设计了lock delay的策略,具体可以看文章,http://research.google.com/archive/chubby.html  。lock delay策略设计的目的在于尽可能防止因网络,用户逻辑及高负载问题引起的旧节点未感知已经有新主的状态下,做出一些导致数据不一致的操作。

代码问题:

我们可以举例一个场景。有一个节点他拿到了锁,然后去执行一个函数,该函数里包含一些db及其他的操作。但是该方法很耗时,执行db就花了很久,直接导致consul session timeout,继而让其他人拿到了锁。这时候你还不知道,db逻辑返回后,继续往下走。

上面的代码逻辑问题,是可以解决的,可以异步new一个协程去不断的renew续约,或者可以在逻辑执行前去判断锁状态。

高负载:

高负载的场景下,不能及时的续约,导致session timeout, 其他节点拿到锁。

consul源码分析lock delay问题

当不主动传递lockDelay的时候,consul会使用默认15s作为lock delay延迟时间。当然你的可以传递自定义的lock delay值,最小可以是1ms。

// xiaorui.cc

func (s *Session) Create(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) {
    var obj interface{}
    if se != nil {
        body := make(map[string]interface{})
        obj = body
        ...
        // 不填lockDelay,就主动传递lockDelay值
        if se.LockDelay != 0 {
            body["LockDelay"] = durToMsec(se.LockDelay)
        }
        ...
        if se.TTL != "" {
            body["TTL"] = se.TTL
        }
    }
    return s.create(obj, q)
}

// 最小的delay粒度是 1ms
func durToMsec(dur time.Duration) string {
    ms := dur / time.Millisecond
    if dur > 0 && ms == 0 {
        ms = 1
    }
    return fmt.Sprintf("%dms", ms)
}

在通过看consul server的源码来分析该问题

// xiaorui.cc

// SessionCreate is used to create a new session
func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
    // Default the session to our node + serf check + release session
    // invalidate behavior.
    args := structs.SessionRequest{
        Op: structs.SessionCreate,
        Session: structs.Session{
            ...
            LockDelay: 15 * time.Second,
            ...
        },
    }
    …
    // Handle optional request body
    if req.ContentLength > 0 {
        fixup := func(raw interface{}) error {
            if err := FixupLockDelay(raw); err != nil {
                return err
            }
            if err := FixupChecks(raw, &args.Session); err != nil {
                return err
            }
            return nil
        }
...

consul最小是1ms的lock delay,通过api是无法更改延迟时间,只能是改consul server的代码了。1ms的延迟对于我们来说已经完全够用了,我想相信绝大数公司的场景也够用了。

google chubby分布式锁设计原理

https://medium.com/coinmonks/chubby-a-centralized-lock-service-for-distributed-applications-390571273052

https://www.youtube.com/watch?v=PqItueBaiRg

总结:

consul 是个好东西,不仅可以做kv,而且可以做服务发现。建议大家可以读读他的源代码,非常值得学习。

前段时间我因为要做交易引擎的分布式,所以把consul cluster的源码及hashicorp raft库的代码看了一遍,受益匪浅呀。找个时间给大家做个分享。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc