源码分析 kubernetes kubelet prober 探针的设计实现

概述

kubelet 使用 livenessProbe 存活探针来确定什么时候要重启容器, 使用 readinessProbe 就绪探针可以确认是否要把流量接入到 service 里, startupProbe 启动探针是为了避免在启动时间不可控时, 使用 liveness 探针探测失败, 造成重启的死循环的场景.

探针参数介绍

探针的参数介绍:

  • initialDelaySeconds : 存活性探测延迟时长,即容器启动多久之后再开始第一次探测操作,显示为 delay 属性;默认为 0 秒,即容器启动后立刻便开始进行探测。
  • timeoutSeconds : 存活性探测的超时时长,显示为 timeout 属性,默认为 1s,最小值也为 1s。
  • periodSeconds : 存活性探测的频度,显示为 period 属性,默认为 10s, 最小值为 1s;过高的频率会对 Pod 对象带来较大的额外开销,而过低的频率又会使得对错误的反映不及时。
  • successThreshold : 处于失败状态时,探测操作至少连续多少次的成功才被认为是通过检测,显示为 #success 属性,默认值为1,最小值也为1。
  • failureThreshold : 处于成功状态时,探测操作至少连续多少次的失败才被视为是检测不通过,显示为 #failure 属性,默认值为 3,最小值为1。

源码分析

探针启动入口

实例化 kubelet 时, 顺便也会实例化 prober 探针管理器

if kubeDeps.ProbeManager != nil {
    klet.probeManager = kubeDeps.ProbeManager
} else {
    klet.probeManager = prober.NewManager(
        klet.statusManager,
        klet.livenessManager,
        klet.readinessManager,
        klet.startupManager,
        klet.runner,
        kubeDeps.Recorder)
}

探针管理器

下面是 prober 管理器的数据结构组成:

pkg/kubelet/prober/prober_manager.go

type probeKey struct {
    podUID        types.UID
    containerName string
    probeType     probeType
}

type manager struct {
    // 存放 worker, key 为 probeKey
    workers map[probeKey]*worker
    workerLock sync.RWMutex

    // 状态管理器
    statusManager status.Manager

    // 探针结果的封装
    readinessManager results.Manager
    livenessManager results.Manager
    startupManager results.Manager

    // 具体的prober的实现
    prober *prober

    // 启动时间, 为了后面做探针的 delay
    start time.Time
}

下面是 prober Manager 实现的接口. 后面分析下 AddPod, StopLivenessAndStartup, RemovePod 的实现.

type Manager interface {
    // 为新 pod 增加探针
    AddPod(pod *v1.Pod)
    // 关闭 pod 时, 先停止 liveness 和 startup 探针
    StopLivenessAndStartup(pod *v1.Pod)
    // 关闭 pod 时, 删除其所有类型的探针
    RemovePod(pod *v1.Pod)
    // 清理 pods
    CleanupPods(desiredPods map[types.UID]sets.Empty)
    // 更新 pod 状态
    UpdatePodStatus(types.UID, *v1.PodStatus)
}

为 Pod 添加探活

kubelet 在创建完容器后, 调用探针管理器的 AddPod 方法为 pod 开启探针. 每个探针都是一个 worker 协程. k8s 里 node 默认最大容器数在 110, 开满三个探针也才几百个协程.

func (m *manager) AddPod(pod *v1.Pod) {
    m.workerLock.Lock()
    defer m.workerLock.Unlock()

    // 组装 key
    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
        key.containerName = c.Name

        // 如果有配置启动探针, 则创建
        if c.StartupProbe != nil {
            // 指定类型为 startup
            key.probeType = startup
            // 如果已经添加 key, 则跳过
            if _, ok := m.workers[key]; ok {
                return
            }
            // 为该 pod 的 startup 模式创建一个 worker 实例
            w := newWorker(m, startup, pod, c)
            m.workers[key] = w
            // 启动该 worker 实例
            go w.run()
        }
        // 如有配置 readiness 探针, 则创建
        if c.ReadinessProbe != nil {
            key.probeType = readiness
            if _, ok := m.workers[key]; ok {
                return
            }
            w := newWorker(m, readiness, pod, c)
            m.workers[key] = w
            go w.run()
        }

        // 如有配置 liveness 探针, 则创建
        if c.LivenessProbe != nil {
            ...
        }
    }
}

关闭 pod 探针

kubelet 在关闭 pod 时, 先调用 StopLivenessAndStartup 停止 liveness 和 startup 探针, 再进行 killPod, 之后再调用 RemovePod 来移除所有的探针.

kubelet 在关闭 pod 时对探针的操作:

pkg/kubelet/kubelet.go

func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {

    // 只关闭 pod 的 liveness 和 startup 探针
    kl.probeManager.StopLivenessAndStartup(pod)

    // 给 pod 发送 SIGTERM 信号,超时未退出则强制 SIGKILL 杀掉容器.
    if err := kl.killPod(ctx, pod, p, gracePeriod); err != nil {
        return err
    }

    // 在探针管理器里删除该 pod 所有类型的探针
    kl.probeManager.RemovePod(pod)

    return nil
}

StopLivenessAndStartup

func (m *manager) StopLivenessAndStartup(pod *v1.Pod) {
    m.workerLock.RLock()
    defer m.workerLock.RUnlock()

    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
        key.containerName = c.Name
        for _, probeType := range [...]probeType{liveness, startup} {
            key.probeType = probeType
            if worker, ok := m.workers[key]; ok {
                worker.stop()
            }
        }
    }
}

RemovePod

func (m *manager) RemovePod(pod *v1.Pod) {
    m.workerLock.RLock()
    defer m.workerLock.RUnlock()

    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
        key.containerName = c.Name
        for _, probeType := range [...]probeType{readiness, liveness, startup} {
            key.probeType = probeType
            if worker, ok := m.workers[key]; ok {
                worker.stop()
            }
        }
    }
}

清理探测器

kubelet 周期性执行 HandlePodCleanups 方法, 该方法会清理一波状态异常的容器, 同时也会清理探针.

func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) {
    m.workerLock.RLock()
    defer m.workerLock.RUnlock()

    for key, worker := range m.workers {
        if _, ok := desiredPods[key.podUID]; !ok {
            worker.stop()
        }
    }
}

prober worker 实现

开一个 ticker 定时器, 然后到期触发 doProbe()

func (w *worker) run() {
    probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
    ...
    probeTicker := time.NewTicker(probeTickerPeriod)

probeLoop:
    for w.doProbe(ctx) {
        select {
        case <-w.stopCh:
            break probeLoop
        case <-probeTicker.C:
        }
    }
}

下面是具体探针检测的逻辑. 尝试去探测, 完成后进行次数累加, 如果探测的次数超过阈值, 则向 kubelet 传递状态更新.

func (w *worker) doProbe(ctx context.Context) (keepGoing bool) {
    startTime := time.Now()

    // 如果 pod 还未被创建,或者 已经被删除会出现, ok 为 false 的情况.
    status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
    if !ok {
        return true
    }

    // 如果 pod 失败或者退出, 则跳出
    if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
        klog.V(3).InfoS("Pod is terminated, exiting probe worker",
            "pod", klog.KObj(w.pod), "phase", status.Phase)
        return false
    }

    c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
    if !ok || len(c.ContainerID) == 0 {
        return true // Wait for more information.
    }

    if w.containerID.String() != c.ContainerID {
        ...
        w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
        // 新容器起来时, 恢复探针检测
        w.onHold = false
    }

    // 旧容器因为不符合 liveness 规则被重启, 新容器还没起来时, 一直跳过探针检测.
    if w.onHold {
        return true
    }

    // 如果在 graceful shutdown 阶段
    if w.pod.ObjectMeta.DeletionTimestamp != nil && (w.probeType == liveness || w.probeType == startup) {
        w.resultsManager.Set(w.containerID, results.Success, w.pod)
        return false
    }

    // 探测要在 initialDelay 延迟参数后才启用
    if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
        return true
    }

    // 真正的执行探活逻辑
    result, err := w.probeManager.prober.probe(ctx, w.probeType, w.pod, status, w.container, w.containerID)
    if err != nil {
        return true
    }

    // 状态相同, 则递增加一
    // 状态不同, 则直接重置, 重新累加.
    if w.lastResult == result {
        w.resultRun++
    } else {
        w.lastResult = result
        w.resultRun = 1
    }

    // 探测失败了, 但次数在阈值以内, 不用传递 pod 状态
    // 或者探测成功了, 且次数在阈值以内, 不用传递 pod 状态
    if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
        (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
        return true
    }

    // 像 kubelet 传递当前 pod 变更状态, kubelet 根据状态选择策略.
    w.resultsManager.Set(w.containerID, result, w.pod)

    // 重置计数 
    if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
        w.onHold = true
        w.resultRun = 0
    }

    return true
}

真正的探测逻辑

重试三遍执行探测逻辑, 根据探针方法不同启动不同的探针方法.

  • exec 探针的 exit code 返回 0 时, 认为探测成功.
  • http 探针的 resp code 返回 2xx 和 3xx 时, 认为探测成功.
  • tcp 探测可以完成建连, 也就是三次握手, 则认为探测成功.
  • grpc 探测需要业务服务集成 grpchealth 包, 正常返回 ‘SERVING’ 状态, 则认为探测成功.
const maxProbeRetries = 3

func (pb *prober) runProbeWithRetries(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
    var err error
    var result probe.Result
    var output string
    for i := 0; i < retries; i++ {
        result, output, err = pb.runProbe(ctx, probeType, p, pod, status, container, containerID)
        if err == nil {
            return result, output, nil
        }
    }
    return result, output, err
}

func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
    // 超时时间
    timeout := time.Duration(p.TimeoutSeconds) * time.Second

    // 执行 exec 类型探针
    if p.Exec != nil {
        command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
        return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
    }
    // 执行 httpGet 类型探针
    if p.HTTPGet != nil {
        req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
        if err != nil {
            return probe.Unknown, "", err
        }
        ...
        return pb.http.Probe(req, timeout)
    }

    // 执行 tcp 类型探针
    if p.TCPSocket != nil {
        ...
        return pb.tcp.Probe(host, port, timeout)
    }

    // 执行 grpc 类型探针
    if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.GRPCContainerProbe) && p.GRPC != nil {
        ...
        return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
    }

    return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

传递状态到 kubelet syncLoop 调度循环

prober 探测到 pod 异常且符合阈值后, 调用 resultsManager.Set 传递状态, 其实就是往 updates 管道发送事件.

func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *v1.Pod) {
    if m.setInternal(id, result) {
        m.updates <- Update{id, result, pod.UID}
    }
}

kubelet 主循环调度 syncLoop 里的 syncLoopIteration 会监听该 updates 管道. 当有事件通知时, 先更改 statusManager 状态并通过 handleProbeSync 执行 podWorkers.UpdatePod 容器更新逻辑.

func (kl *Kubelet) syncLoopIteration(...) bool {
    select {
    case u, open := <-configCh:
        ...
    case e := <-plegCh:
        ...
    case update := <-kl.livenessManager.Updates():
        if update.Result == proberesults.Failure {
            handleProbeSync(kl, update, handler, "liveness", "unhealthy")
        }
    case update := <-kl.readinessManager.Updates():
        ready := update.Result == proberesults.Success
        kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

        status := ""
        if ready {
            status = "ready"
        }
        handleProbeSync(kl, update, handler, "readiness", status)
    case update := <-kl.startupManager.Updates():
        started := update.Result == proberesults.Success
        kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

        status := "unhealthy"
        if started {
            status = "started"
        }
        handleProbeSync(kl, update, handler, "startup", status)
    }
    return true
}

大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc