源码分析 kubernetes ingress nginx controller 控制器的实现原理
本文基于 kubernetes/ingress-nginx v1.5.1
源码分析, ingress-nginx 里的 controller 控制器是 golang 开发的. 而 ingress-nginx 容器内使用了官方的 nginx, 没有直接使用 openresty, 原生 nginx 编译时打入了 lua / luajit 模块, 但引用的三方的库包是属于 openresty 社区里的. 在这看 ingress-nginx 详细的 lua 库包引用信息.
https://github.com/kubernetes/ingress-nginx/blob/21aa7f55a3/images/nginx/rootfs/build.sh
ingress-nginx 为什么没有直接使用 openresty?
在 github issue 列表中没有找到答案. 按理来说社区方面 openresty 要比 nginx 更加开放, 但毕竟没有 nginx 的背景和后台金主.
像社区的 kong 和 apache apisix 是基于 openresty 开发的, 另外他们也在社区中开源了自己的 ingress-controller.
ingress-nginx 项目地址:
https://github.com/kubernetes/ingress-nginx
ingress-nginx 核心函数调用关系流程:
实现过程略显复杂, 故图中忽略细节.
实例化 nginx ingress controller 控制器
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
// 读取 resulv.conf 文件,获取 dns server 地址集合
h, err := dns.GetSystemNameServers()
n := &NGINXController{
stopCh: make(chan struct{}),
// informer 注册 eventHandler 会往这个 chan 发送事件.
updateCh: channels.NewRingChannel(1024),
Proxy: &tcpproxy.TCPProxy{},
command: NewNginxCommand(),
}
// 实例化 store 对象, 可以把 store 想成一个有各种数据的缓存的存储, 内部也有 informer.
n.store = store.New(
...
n.updateCh, // 把上面实例化的 updateCh 传进去了
...
)
// 实例化 queue, 并且在 queue里注册了回调方法, syncIngress 是 nginx ingress controller 最核心的同步方法.
n.syncQueue = task.NewTaskQueue(n.syncIngress)
// 用在 inotify 文件监听的回调方法
onTemplateChange := func() {
// 从 `/etc/nginx/template/nginx.tmpl` 读取预设的模板, 然后进行解析生成 template 对象.
template, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
klog.ErrorS(err, "Error loading new template")
return
}
n.t = template
// 向 queue 传递事件, 平滑热加载 nginx 配置
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}
// 从 `/etc/nginx/template/nginx.tmpl` 读取预设的模板, 然后进行解析生成 template 对象.
ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
klog.Fatalf("Invalid NGINX configuration template: %v", err)
}
n.t = ngxTpl
// 使用 inotify 机制异步监听 nginx.tmpl 模板文件, 当模板文件发生变更时, 则回调 onTemplateChange 方法, 重新读取模板并构建模板对象, 然后同步配置
file.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
// 获取 geoip 目录下的相关文件, v4.4.2 里当前就只有三个文件, 分别是 geoip.dat, geoIPASNum.dat, geoLiteCity.dat 数据文件.
filesToWatch := []string{}
err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
filesToWatch = append(filesToWatch, path)
return nil
})
for _, f := range filesToWatch {
// 异步监听 geoip dat 数据文件, 当发生增删改时, 重新平滑热加载 nginx 配置.
_, err = file.NewFileWatcher(f, func() {
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
}
return n
}
服务启动入口
Start()
启动 nginx controller 控制器, 其原理如下:
- 启动
store.Run()
, 内部会启动 informer 监听并维护各资源的本地缓存 ; - 进行 leader election 选举, 只有主实例才可以执行状态同步的逻辑 ;
- 启动 nginx 进程, 指定配置为
/etc/nginx/nginx.conf
; - 启动 syncQueue 里 run 方法, 该方法内部会从队列中读取任务, 并调用 syncIngress 来同步 nginx 配置 ;
- 前面 nginx 启动使用时, 只是使用了默认的 nginx.conf, 里面几乎没什么东西, 这里通过主动通知 syncqueue, 然后 syncqueue worker 调度 syncIngress 来同步配置并完成热加载 ;
- 启动一个协程每隔 5秒进行临时配置文件清理 ;
- 从 informer 导入的 updateCh 获取任务, 并写到 syncQueue 队列里.
func (n *NGINXController) Start() {
klog.InfoS("Starting NGINX Ingress controller")
// 启动 store, 内部会启动 informer 监听并维护各资源的本地缓存.
n.store.Run(n.stopCh)
// 进行选举, 只有主实例才可以执行状态同步的更新的逻辑.
setupLeaderElection(&leaderElectionConfig{
Client: n.cfg.Client,
ElectionID: electionID,
OnStartedLeading: func(stopCh chan struct{}) {
if n.syncStatus != nil {
// 开启状态的同步更新
go n.syncStatus.Run(stopCh)
}
},
})
// 配置进程组, 使用 cmd 创建的程序都所属相同的 pgid 组id, 这样杀掉进程时可以按照进程组杀, 避免有遗漏的进程.
cmd := n.command.ExecCommand()
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
// 加载 ssl proxy
if n.cfg.EnableSSLPassthrough {
n.setupSSLProxy()
}
// 启动 nginx 进程, 指定配置为 `/etc/nginx/nginx.conf`.
n.start(cmd)
// 启动 syncQueue 里 run 方法, 该方法内部会从队列中读取任务, 并调用 syncIngress 来同步 nginx 配置.
go n.syncQueue.Run(time.Second, n.stopCh)
// 前面 nginx 启动使用时, 只是使用了默认的 nginx.conf, 里面几乎没什么东西, 这里通过主动传任务到 syncqueue, 然后 syncqueue 利用 syncIngress 来同步配置并完成热加载
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
go func() {
for {
time.Sleep(5 * time.Minute)
// 启动一个异步 gc 协程, 清理 nginx 临时文件, 临时文件的名字前缀有 `nginx-cfg` 字符串, 当满足临时文件特征, 且更改超过5分钟则删除该临时文件
cleanTempNginxCfg()
}
}()
for {
select {
case err := <-n.ngxErrCh:
...
case event := <-n.updateCh.Out():
// 从 informer 拿到更改事件
if evt, ok := event.(store.Event); ok {
// 如果事件的类型是配置相关的, 则通知给 syncqueue, 让 syncqueue 的内部去同步配置.
if evt.Type == store.ConfigurationEvent {
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
// 同上, 只是任务可跳过.
n.syncQueue.EnqueueSkippableTask(evt.Obj)
}
case <-n.stopCh:
return
}
}
}
监听 informer 事件
store
实例化了 nginx ingress controller 所需资源的 informer 和 lister, 并注册 eventHandler 方法.
源码位置: https://github.com/kubernetes/ingress-nginx/blob/main/internal/ingress/controller/store/store.go
func New(
namespace string,
...
updateCh *channels.RingChannel) Storer {
store := &k8sStore{
informers: &Informer{}, // 各资源 informers 的集合
listers: &Lister{}, // informers listers 集合
updateCh: updateCh, // 通知给 syncqueue 的通道
backendConfig: ngx_config.NewDefault(), // 获取 nginx 模板中需要的默认变量, 比如缓冲大小呀, keepalive 配置, http2 相关配置等, 另外默认 WorkerProcesses 为当前的cpu核心数.
}
// 实例化 nginx ingress controller 所需资源的 informer 和 lister.
store.informers.Ingress = infFactory.Networking().V1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
store.informers.EndpointSlice = infFactory.Discovery().V1().EndpointSlices().Informer()
store.listers.EndpointSlice.Store = store.informers.EndpointSlice.GetStore()
store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
store.informers.ConfigMap = infFactoryConfigmaps.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
...
// 巴拉巴拉, 实现了各类 informer 的 eventHandler 方法.
...
// 监听 ingress 资源并注册方法
store.informers.Ingress.AddEventHandler(ingEventHandler)
// 监听 endpoints 资源并注册方法
store.informers.EndpointSlice.AddEventHandler(epsEventHandler)
// 监听 secret 资源并注册方法
store.informers.Secret.AddEventHandler(secrEventHandler)
// 监听 configmap 资源并注册方法
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
// 监听 service 资源并注册方法
store.informers.Service.AddEventHandler(serviceHandler)
return store
}
ingress informer eventhandler
下面是 store 里 ingress informer eventhandler 的实现, 收到事件后首先更新本地的缓存, 然后给 updateCh 传递通知.
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing, _ := toIngress(obj)
// 过滤不合法的 ns
if !watchedNamespace(ing.Namespace) {
return
}
// 更新 store 里 ingress 的缓存
store.syncIngress(ing)
// 更新 store 里 secret 的缓存
store.syncSecrets(ing)
// 向 updateCh 传递事件, 让 syncqueue 同步并加载配置
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: ingDeleteHandler,
UpdateFunc: func(old, cur interface{}) {
oldIng, _ := toIngress(old)
curIng, _ := toIngress(cur)
if !watchedNamespace(oldIng.Namespace) {
return
}
// 同步 store 缓存, 并传递更新事件
store.syncIngress(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
service informer eventHandler
下面是 store 里 service informer eventHandler 的实现原理. 这里关注 service type 为 externalName 的, externalName 是个特殊的类型.
serviceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
// 只关注 service type 为 externalName 的
if svc.Spec.Type == corev1.ServiceTypeExternalName {
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
}
},
DeleteFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
// 只关注 service type 为 externalName 的
if svc.Spec.Type == corev1.ServiceTypeExternalName {
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
}
},
UpdateFunc: func(old, cur interface{}) {
oldSvc := old.(*corev1.Service)
curSvc := cur.(*corev1.Service)
// 一样则忽略
if reflect.DeepEqual(oldSvc, curSvc) {
return
}
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
syncQueue
syncQueue
内部维护了 workqueue
队列, 并启动了一个 worker 协程, 该协程从内部 workqueue 获取任务, 然后调用 syncIngress
来同步和加载nginx配置.
代码位置: https://github.com/kubernetes/ingress-nginx/blob/main/internal/task/queue.go
// 一直调用 worker 协程, 直到 stopCh 退出, 失败重新调度间隔为 1秒.
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
...
ts := time.Now().UnixNano()
item := key.(Element)
// 如果上次同步时间超过了 item 时间大, 则跳过, 避免无效同步.
if item.Timestamp != 0 && t.lastSync > item.Timestamp {
t.queue.Forget(key)
t.queue.Done(key)
continue
}
// 使用 syncIngress 来处理 key, 该方法为 ingress-nginx 核心处理入口.
if err := t.sync(key); err != nil {
// 同步 nginx 失败, 则重新入队
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: 0,
})
} else {
// 成功可剔除
t.queue.Forget(key)
t.lastSync = ts
}
// 成功处理
t.queue.Done(key)
}
}
ingress 的核心同步方法 syncIngress
syncIngress 是控制器核心处理入口, 主要调用 OnUpdate
和 configureDynamically
.
- 判断是否满足动态更新, 不满足则调用 OnUpdate 完成 nginx 配置生成, 校验和 reload 热加载.
- 调用 configureDynamically 方法来实现动态更新.
func (n *NGINXController) syncIngress(interface{}) error {
// 从 store 的 informer 里获取 ingress 集合
ings := n.store.ListIngresses()
// 拿到 hosts 集合, servers 集合
hosts, servers, pcfg := n.getConfiguration(ings)
// 如果配置无变更, 直接跳出
if n.runningConfig.Equal(pcfg) {
return nil
}
// 不满足动态更新, 需要构建配置并 reload 加载.
if !utilingress.IsDynamicConfigurationEnough(pcfg, n.runningConfig) {
klog.InfoS("Configuration changes detected, backend reload required")
// 计算 pcfg 的 hash 值
hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
TagName: "json",
})
pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
// 真正操作 nginx, 生成 nginx 配置, 语法检测, 最后执行 reload 热加载.
err := n.OnUpdate(*pcfg)
if err != nil {
return err
}
klog.InfoS("Backend successfully reloaded")
}
// 如果是首次配置, 需稍等一下
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
klog.InfoS("Initial sync, sleeping for 1 second")
// 首次配置, 等待个 1s, nginx 解析大配置文件时, 有可能会有延迟.
time.Sleep(1 * time.Second)
}
retry := wait.Backoff{}
retriesRemaining := retry.Steps
err := wait.ExponentialBackoff(retry, func() (bool, error) {
// 执行动态配置, 后面有详细讲解
err := n.configureDynamically(pcfg)
if err == nil {
return true, nil
}
// 重试减一
retriesRemaining--
if retriesRemaining > 0 {
return false, nil
}
// 当 err 不为 nil 时, wait 不会再进行重试调度.
return false, err
})
n.runningConfig = pcfg
return nil
}
OnUpdate 真正同步 nginx 配置
syncIngress
内部会调用 OnUpdate
方法操作 nginx, 也就是说 OnUpdate
会真正的操作的 nginx 配置.
源码分析原理如下:
- 通过模板生成 nginx 配置, 赋值给 content 对象里 ;
- 根据不同的 collector 类型从 zipkin, jaeger 选定模板, 然后创建
opentracing
配置, 把 opentracing 配置写到 /etc/nginx/opentracing.json 里 ; - 使用
nginx -t -c tmpfile
来检测临时生成的 nginx 配置文件 ; - 当日志级别允许 level 2 时, 允许则打印新旧配置差异的部分 ;
- 把 nginx 配置写到 /etc/nginx/nginx.conf 里 ;
- 使用
nginx -s reload
进行配置的热加载.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
// 通过模板生成 nginx 配置, 赋值给 content 对象里.
content, err := n.generateTemplate(cfg, ingressCfg)
if err != nil {
return err
}
// 根据不同的 collector 类型从 zipkin, jaeger 等选定模板, 然后创建 `opentracing` 配置, 把 opentracing 配置写到 /etc/nginx/opentracing.json 里.
err = createOpentracingCfg(cfg)
if err != nil {
return err
}
// 使用 `nginx -t -c tmpfile` 来检测临时生成的 nginx 配置文件.
err = n.testTemplate(content)
if err != nil {
return err
}
// 判断日志级别是否允许 2 level, 允许则打印新旧配置差异的部分.
if klog.V(2).Enabled() {
src, _ := os.ReadFile(cfgPath)
// 如果当前配置跟预期配置不同的话, 获取差异部分并打印输出.
if !bytes.Equal(src, content) {
// 把配置放到临时文件里
tmpfile, err := os.CreateTemp("", "new-nginx-cfg")
os.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
// 使用 diff 命令判断配置文件的差异
diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
// 打印配置中有差异的部分
klog.InfoS("NGINX configuration change", "diff", string(diffOutput))
// 删除临时文件
os.Remove(tmpfile.Name())
}
}
// 把 nginx 配置写到 /etc/nginx/nginx.conf 里.
err = os.WriteFile(cfgPath, content, file.ReadWriteByUser)
if err != nil {
return err
}
// 使用 nginx -s reload 进行热加载
o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", err, string(o))
}
return nil
}
通过模板生成 nginx 配置
在实例化控制器时 store 通过 ngx_config.NewDefault()
获取默认的 nginx 配置变量, 然后读取 nginx.tmpl 模板且生成模板解释器.
generateTemplate
的作用是填充或者覆盖默认的 nginx 变量, 最后通过模板解释器和模板变量生成最终的配置文件.
// generateTemplate returns the nginx configuration file content
func (n NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressCfg ingress.Configuration) ([]byte, error) {
// 处理 ssl 配置
if n.cfg.EnableSSLPassthrough {
servers := []*tcpproxy.TCPServer{}
for _, pb := range ingressCfg.PassthroughBackends {
svc := pb.Service
if svc == nil {
continue
}
port, err := strconv.Atoi(pb.Port.String()) // #nosec
servers = append(servers, &tcpproxy.TCPServer{
Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP,
Port: port,
ProxyProtocol: false,
})
}
n.Proxy.ServerList = servers
}
// 设置 nginx 的 server_names_hash_bucket_size, 通过预设 hash bucket 的个数, 来加快查询速度, 减少拉链查询的概率.
nameHashBucketSize := nginxHashBucketSize(longestName)
if cfg.ServerNameHashBucketSize < nameHashBucketSize {
cfg.ServerNameHashBucketSize = nameHashBucketSize
}
// 设置 nginx 的 map_hash_bucket_size, 搭配上面的配置使用, 提高 nginx hashmap 检索速度.
serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
cfg.ServerNameHashMaxSize = serverNameHashMaxSize
}
// 设置 nginx 的 worker_rlimit_nofile 每个 worker 可以打开的最大文件描述符数量, 这里的 fd 不仅仅指文件, 还是链接文件描述符 (socket fd).
if cfg.MaxWorkerOpenFiles == 0 {
maxOpenFiles := rlimitMaxNumFiles() - 1024
if maxOpenFiles < 1024 {
maxOpenFiles = 1024 // 最小为 1024
}
cfg.MaxWorkerOpenFiles = maxOpenFiles
}
// 配置 nginx worker_connections, 每个 worker 的连接数量, worker_connections 通常要小于 worker_rlimit_nofile, 一个是连接的限制, 一个是所有 fd 的限制.
if cfg.MaxWorkerConnections == 0 {
maxWorkerConnections := int(float64(cfg.MaxWorkerOpenFiles * 3.0 / 4))
cfg.MaxWorkerConnections = maxWorkerConnections
}
// 配置转发请求时的 proxy header, 这个从 configmap 里获取.
setHeaders := map[string]string{}
if cfg.ProxySetHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
if err != nil {
} else {
setHeaders = cmap.Data
}
}
// 响应时填充的 header, 同样从 configmap 获取.
addHeaders := map[string]string{}
if cfg.AddHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
if err != nil {
} else {
addHeaders = cmap.Data
}
}
// 设置 ssl 参数
cfg.SSLDHParam = sslDHParam
cfg.DefaultSSLCertificate = n.getDefaultSSLCertificate()
// 配置 access_log 和 error_log 的位置
if n.cfg.IsChroot {
if cfg.AccessLogPath == "/var/log/nginx/access.log" {
cfg.AccessLogPath = fmt.Sprintf("syslog:server=%s", n.cfg.InternalLoggerAddress)
}
if cfg.ErrorLogPath == "/var/log/nginx/error.log" {
cfg.ErrorLogPath = fmt.Sprintf("syslog:server=%s", n.cfg.InternalLoggerAddress)
}
}
tc := ngx_config.TemplateConfig{
// 配置请求时自定义的 header 填充, 模板中使用 proxy_set_header 指令
ProxySetHeaders: setHeaders,
// 定制响应报文的 header, 模板中使用 more_set_headers 指令
AddHeaders: addHeaders,
// 连接全队列的大小, 从 /net/core/somaxconn 获取, 读取失败或者过小则设置为 511.
BacklogSize: sysctlSomaxconn(),
Backends: ingressCfg.Backends,
// nginx 的 server 段配置
Servers: ingressCfg.Servers,
// nginx stream tcp server 的配置
TCPBackends: ingressCfg.TCPEndpoints,
// nginx stream udp server 的配置
UDPBackends: ingressCfg.UDPEndpoints,
// ngx_config 配置
Cfg: cfg,
// redirect 跳转
RedirectServers: utilingress.BuildRedirects(ingressCfg.Servers),
// 监听的端口
ListenPorts: n.cfg.ListenPorts,
// 开启 metrics 监控, 这个是在 http lua 逻辑里
EnableMetrics: n.cfg.EnableMetrics,
// 主要跟 geolite 有关系
MaxmindEditionFiles: n.cfg.MaxmindEditionFiles,
// 在每个 http server 里加入一个 location path 为 /healthz 的接口, 处理逻辑是直接 return 200
HealthzURI: nginx.HealthPath,
// 声明下 nginx pid 位置在 `/tmp/nginx/nginx.pid`
PID: nginx.PID,
// 开启 nginx status 接口
StatusPath: nginx.StatusPath,
StatusPort: nginx.StatusPort,
}
// 在 nginx.conf 文件头部位置的注释里加入配置的 checksum 校验码, 其实就是配置文件的 hash 值.
tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
// n.t 为 nginx.tmpl 的模板解释器, write 通过传递的 ngx 变量来生成 nginx 配置.
return n.t.Write(tc)
}
nginx.tmpl
模板语言为 golang 标准库 text/template
的语法. http 和 http2 的 server 段在 nginx.tmpl 实在太冗长了.
这里就简单举例说明 nginx stream tcp 和 udp 的模板构建过程.
stream {
# 定义 lua 包位置
lua_package_path "/etc/nginx/lua/?.lua;/etc/nginx/lua/vendor/?.lua;;";
# 设置一个共享存储
lua_shared_dict tcp_udp_configuration_data 5M;
# 判断 access_log 是否开启 access_log, 如果开启则指定文件路径及格式.
{{ if or cfg.DisableAccessLogcfg.DisableStreamAccessLog }}
access_log off;
{{ else }}
access_log {{ or cfg.StreamAccessLogPathcfg.AccessLogPath }} log_stream {{ cfg.AccessLogParams }};
{{ end }}
# 错误日志
error_log {{cfg.ErrorLogPath }} {{ cfg.ErrorLogLevel }};
upstream upstream_balancer {
# 这个只是占位符而已
server 0.0.0.1:1234; # placeholder
# tcp 和 udp 转发是依赖 openretry balancer_by_lua_block 控制的.
# 具体转发逻辑在 tcp_udp_balancer.lua 这里实现的, balance 是调度入口.
balancer_by_lua_block {
tcp_udp_balancer.balance()
}
}
# 遍历生成 TCP services
{{ rangetcpServer := .TCPBackends }}
server {
# 针对 tcp ipv4 listen 进行渲染
{{ range address :=all.Cfg.BindAddressIpv4 }}
listen {{ address }}:{{tcpServer.Port }}{{ if tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ else }}
listen {{tcpServer.Port }}{{ if tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ end }}
# 针对 tcp ipv6 listen 进行渲染
{{ ifIsIPV6Enabled }}
{{ range address :=all.Cfg.BindAddressIpv6 }}
listen {{ address }}:{{tcpServer.Port }}{{ if tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ else }}
listen [::]:{{tcpServer.Port }}{{ if tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ end }}
{{ end }}
# 配置 tcp proxy 参数
proxy_timeout {{cfg.ProxyStreamTimeout }};
proxy_next_upstream {{ if cfg.ProxyStreamNextUpstream }}on{{ else }}off{{ end }};
proxy_next_upstream_timeout {{cfg.ProxyStreamNextUpstreamTimeout }};
proxy_next_upstream_tries {{ cfg.ProxyStreamNextUpstreamTries }};
# 转发到 upstream_balancer
proxy_pass upstream_balancer;
# proxy_protocol 是在转发的 tcp 报文中插入客户端 ip 地址, 这样经过层层网关转发后, 后面的 nginx 也可以拿到客户端地址.
{{ iftcpServer.Backend.ProxyProtocol.Encode }}
proxy_protocol on;
{{ end }}
}
{{ end }}
# 遍历生成 UDP services
{{ range udpServer := .UDPBackends }}
server {
{{ rangeaddress := all.Cfg.BindAddressIpv4 }}
listen {{address }}:{{ udpServer.Port }} udp;
{{ else }}
listen {{udpServer.Port }} udp;
{{ end }}
{{ if IsIPV6Enabled }}
{{ rangeaddress := all.Cfg.BindAddressIpv6 }}
listen {{address }}:{{ udpServer.Port }} udp;
{{ else }}
listen [::]:{{udpServer.Port }} udp;
{{ end }}
{{ end }}
proxy_responses {{ cfg.ProxyStreamResponses }};
proxy_timeout {{cfg.ProxyStreamTimeout }};
proxy_next_upstream {{ if cfg.ProxyStreamNextUpstream }}on{{ else }}off{{ end }};
proxy_next_upstream_timeout {{cfg.ProxyStreamNextUpstreamTimeout }};
proxy_next_upstream_tries {{ $cfg.ProxyStreamNextUpstreamTries }};
proxy_pass upstream_balancer;
}
{{ end }}
}
stream 的 backend 调度选择是依赖 tcp_udp_balancer.lua 来实现的.
在 nginx stream 插入了 balancer_by_lua_block
调度块, 当 nginx 对该 server 进行转发时, 先通过 balance 调度算法获取理想的 peer 地址, 然后 nginx 依赖自己的 stream 模块跟 peer 建连然后数据转发.
通过源码分析得知, balance 内部维护的地址池不是 service 的 clusterIP 集群地址, 而是每个 endpoint 的地址. 这样减少了 ipvs 和 iptables 的 dnat/snat 开销, 并且 balancer 内部提供了更细致的负载均衡算法.
function _M.balance()
# 获取负载均衡器对象
local balancer = get_balancer()
if not balancer then
return
end
# 经过调度算法来获取转发的地址
local peer = balancer:balance()
if not peer then
return
end
# 把获取的 peer 地址放到 ngx 变量里, nginx 依赖该变量进行转发.
local ok, err = ngx_balancer.set_current_peer(peer)
if not ok then
end
end
nginx.tmpl 文件长度差不多有 1500
行, 大家可以参照传入的变量结构体 ngx_config.TemplateConfig
来分析下模板生成的过程.
nginx.tmpl
模板的源码位置:
创建 opentracing 配置
判断 collector 类型, 然后实例化不同的收集器的模板, 生成最后的配置后写到 /etc/nginx/opentracing.json
目录里.
const zipkinTmpl = `{
...
}`
const jaegerTmpl = `{
"service_name": "{{ .JaegerServiceName }}",
"propagation_format": "{{ .JaegerPropagationFormat }}",
"sampler": {
"type": "{{ .JaegerSamplerType }}",
"param": {{ .JaegerSamplerParam }},
"samplingServerURL": "{{ .JaegerSamplerHost }}:{{ .JaegerSamplerPort }}/sampling"
},
"reporter": {
"endpoint": "{{ .JaegerEndpoint }}",
"localAgentHostPort": "{{ .JaegerCollectorHost }}:{{ .JaegerCollectorPort }}"
},
"headers": {
"TraceContextHeaderName": "{{ .JaegerTraceContextHeaderName }}",
"jaegerDebugHeader": "{{ .JaegerDebugHeader }}",
"jaegerBaggageHeader": "{{ .JaegerBaggageHeader }}",
"traceBaggageHeaderPrefix": "{{ .JaegerTraceBaggageHeaderPrefix }}"
}
}`
func createOpentracingCfg(cfg ngx_config.Configuration) error {
var tmpl *template.Template
var err error
if cfg.ZipkinCollectorHost != "" {
tmpl, err = template.New("zipkin").Parse(zipkinTmpl)
} else if cfg.JaegerCollectorHost != "" || cfg.JaegerEndpoint != "" {
tmpl, err = template.New("jaeger").Parse(jaegerTmpl)
} else if cfg.DatadogCollectorHost != "" {
tmpl, err = template.New("datadog").Parse(datadogTmpl)
}
...
tmplBuf := bytes.NewBuffer(make([]byte, 0))
err = tmpl.Execute(tmplBuf, cfg)
if err != nil {
return err
}
expanded := os.ExpandEnv(tmplBuf.String())
return os.WriteFile("/etc/nginx/opentracing.json", []byte(expanded), file.ReadWriteByUser)
}
nginx 通过模板生成配置时, 会在配置中开启 opentracing
, 并声明各个收集器的动态库及相关配置的位置.
load_module /etc/nginx/modules/ngx_http_opentracing_module.so;
opentracing on;
opentracing_propagate_context;
# opentracing_load_tracer /usr/local/lib/libdd_opentracing.so /etc/nginx/opentracing.json;
# opentracing_load_tracer /usr/local/lib/libjaegertracing_plugin.so /etc/nginx/opentracing.json;
opentracing_load_tracer /usr/local/lib/libzipkin_opentracing_plugin.so /etc/nginx/opentracing.json;
...
官方的 nginx 自身是不支持该 opentracing 全链路追踪, 依赖 nginx-opentracing
项目以插件 plugin 形式来让 nginx 支持 opentracing 链路追踪功能.
项目地址: https://github.com/opentracing-contrib/nginx-opentracing
测试并校验 nginx 配置
创建一个临时的文件, 把 nginx 配置写到临时文件, 然后使用 nginx -t -c nginx-cfg-xxx
测试下.
但让人奇怪的是只是通过 exec 的 err 判断是否错误, 而不是通过输出的内存或者 process exit code 来识别错误. 想来是因为维护者因为前期做了很多校验, 后面又是通过模板来生成的配置, 假定不出问题吧.
func (n NGINXController) testTemplate(cfg []byte) error {
tmpDir := os.TempDir() + "/nginx"
// 在 /tmp/nginx 临时目录创建前缀为 `nginx-cfg` 后跟随机数的临时文件.
tmpfile, err := os.CreateTemp(tmpDir, tempNginxPattern)
if err != nil {
return err
}
defer tmpfile.Close()
err = os.WriteFile(tmpfile.Name(), cfg, file.ReadWriteByUser)
if err != nil {
return err
}
// 通过 nginx -t -c nginx-cfgxxxx 来测试临时配置是否合法.
out, err := n.command.Test(tmpfile.Name())
if err != nil {
return errors.New(oe)
}
os.Remove(tmpfile.Name())
return nil
}
nginxCommand 库封装了 nginx 的一些命令, 比如下面的 nginx -t
.
func (nc NginxCommand) Test(cfg string) ([]byte, error) {
return exec.Command(nc.Binary, "-c", cfg, "-t").CombinedOutput()
}
nginx reload 设计实现
由于篇幅原因不分析 nginx 代码了, 下面是 nginx -s reload
的实现原理过程.
- 修改完 nginx 的配置文件后, 向 master 进程发送 HUP 信号,
nginx -s reload
本质也是发送 HUP 信号; - master 进程在收到 HUP 信号以后, 会在第二步检查我们的配置文件语法是否正确;
- 在 nginx 的配置语法全部正确以后, master 进程会打开新的监听端口 ;
- master 进程会用新的 nginx.conf 配置文件来启动新的 worker 子进程 ;
- 由 master 进程再向老 worker 子进程发送 QUIT 信号.
老 worker 如何优雅退出 ?
nginx 是可以实现优雅退出的, 老 worker 收到 SIGQUIT 信号后不会立马退出, 而是先在本 worker eventloop 上摘除 listenfd 的监听, 这样 epoll 就拿不到 listenfd 的事件, 更不会去做 accept 操作了. 之后 worker 等待该 worker 上的所有长连接都关闭才退出.
这里说的长连接说的是 tcp stream 和 http2 这类长连接, 另外当一个 http 请求迟迟不返回 response, 也是影响 worker 退出.
如何老连接就是不退出怎么办 ?
在 nginx v1.11.11 版本之前是干等, 客户端长连接不断, 那么 worker 进程就不退出.
之后版本加入了 worker_shutdown_timeout
参数, 等待一段时间还连接还未退出, 则强制主动关闭.
实际上 worker_shutdown_timeout 参数的实现原理还是有些绕的, 后面补写一篇文章专门分析下 nginx grace reload 细节.
如何动态修改 nginx 配置, 非 reload (configureDynamically)
通过前面 nginx.tmpl 的配置得知, nginx (openretry) 转发的逻辑是依赖 upstream 的 balancer_by_lua_block 指令实现的.
http 和 stream (tcp/udp) 在生成配置时, 在 upstream 段里都插入了 balancer_by_lua_block
指令用来实现自定义负载均衡逻辑, nginx 会依赖该 balancer 来获取转发的地址, 然后对该连接进行转发.
该 lua 转发模块代码位置是
rootfs/etc/nginx/lua/balancer.lua
.
balancer_by_lua_block 是怎么回事 ?
balancer_by_lua_block
是一个支持自定义负载均衡器的指令, 通常基于 nginx 的服务发现就是通过该指令实现的.
开发时一定要注意事项, balancer_by_lua_block 只是通过自定义负载均衡算法获取 peer 后端地址, 接着通过 balancer.set_current_peer(ip, port)
进行赋值. 后面连接的建立,连接池维护,数据拷贝转发等流程统统不在这里,而是由 nginx 内部 upstream 转发逻辑实现.
一句话,nginx 只是调用 balancer_by_lua_block
获取理想的后端地址而已.
下面是使用 balancer_by_lua_block
实现调度地址池的例子:
upstream backend{
server 0.0.0.0;
balancer_by_lua_block {
local balancer = require "ngx.balancer"
local host = {"s1.xiaorui.cc", "s2.xiaorui.cc"}
local backend = ""
local port = ngx.var.server_port
local remote_ip = ngx.var.remote_addr
local key = remote_ip..port
# 使用地址 hash 调度算法
local hash = ngx.crc32_long(key);
hash = (hash % 2) + 1
backend = host[hash]
ngx.log(ngx.DEBUG, "ip_hash=", ngx.var.remote_addr, " hash=", hash, " up=", backend, ":", port)
# 配置后端地址, nginx 进行转发时依赖该地址
local ok, err = balancer.set_current_peer(backend, port)
if not ok then
ngx.log(ngx.ERR, "failed to set the current peer: ", err)
return ngx.exit(500)
end
}
}
server {
listen 80;
server_name xiaorui.cc
location / {
proxy_pass http://backend;
}
}
lua-nginx-module 项目中关于 balancer_by_lua_block 实现:
https://github.com/openresty/lua-nginx-module#balancer_by_lua_block
在 nginx 里加入 balancer_by_lua_block 指令
在 nginx.tmpl
中加入了 balancer_by_lua_block
指令, 所以不管是 http 和 stream 段里的 upstream 转发, 不再走 server 配置, 而是走 balancer_by_lua_block
自定义流程.
http {
upstream upstream_balancer {
// 只是占位符, openretry 优先走 balancer_by_lua 逻辑块.
server 0.0.0.1; # placeholder
balancer_by_lua_block {
balancer.balance()
}
{{ if (gt cfg.UpstreamKeepaliveConnections 0) }}
keepalive {{cfg.UpstreamKeepaliveConnections }};
keepalive_time {{ $cfg.UpstreamKeepaliveTime }};
...
{{ end }}
}
...
server {
...
}
}
stream {
upstream upstream_balancer {
// 同上, 只是占位符, 避免 nginx -t 检测出错.
server 0.0.0.1:1234; # placeholder
balancer_by_lua_block {
tcp_udp_balancer.balance()
}
}
...
server {
...
}
}
把变更信息通知给 nginx
- 检查 http backends 是否有变更, 当有变更时, 把 backends 数据通知给 nginx 的
http://127.0.0.1:10246/configuration/backends
接口上. - 检查 tcp/udp strem backends 是否有变更, 发生变更时, 把 stream backends 数据发到 nginx 的 tcp
10247
端口上. - 当证书发生变更时, 发数据发到 nginx 的
http://127.0.0.1:10246/configuration/servers
接口上.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
// 当 endpoints 地址发生变更时
if backendsChanged {
// 动态修改 http 的 backends
err := configureBackends(pcfg.Backends)
if err != nil {
return err
}
}
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
// 当 endpoints 地址发生变更时
if streamConfigurationChanged {
// 动态修改 tcp 和 udp 的 backends 地址列表
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil {
return err
}
}
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
// 当 servers 地址发生变更时
if serversChanged {
// 动态修改证书相关配置
err := configureCertificates(pcfg.Servers)
if err != nil {
return err
}
}
return nil
}
这里拿 configureBackends()
变更配置来说. 组装 openresty 专用的 backends 数据, 然后序列化成 json, post 发给 openresty 的 /configuration/backends
接口上.
func configureBackends(rawBackends []*ingress.Backend) error {
backends := make([]*ingress.Backend, len(rawBackends))
for i, backend := range rawBackends {
luaBackend := &ingress.Backend{
...
}
var endpoints []ingress.Endpoint
for _, endpoint := range backend.Endpoints {
endpoints = append(endpoints, ingress.Endpoint{
Address: endpoint.Address,
Port: endpoint.Port,
})
}
luaBackend.Endpoints = endpoints
backends[i] = luaBackend
}
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
return nil
}
func NewPostStatusRequest(path, contentType string, data interface{}) (int, []byte, error) {
url := fmt.Sprintf("http://127.0.0.1:%v%v", StatusPort, path)
buf, err := json.Marshal(data)
...
res, err := client.Post(url, contentType, bytes.NewReader(buf))
...
body, err := io.ReadAll(res.Body)
...
return res.StatusCode, body, nil
}
上面代码是如何发送变更信息, 那么谁来接收动态数据的投递?
nginx.conf
中定义了一个解决动态配置更新的 server 配置段, 其中变量 StatusPort 为 10246, 接口的 prefix 路径为 /configuration
, 该接口定义了 content_by_lua_block 处理块.
当接口收到请求后, 调用自定义 lua 模块 configuration.lua
中 configuration.call()
入口方法.
server {
listen 127.0.0.1:{{ .StatusPort }};
keepalive_timeout 0;
gzip off;
access_log off;
location {{ healthzURI }} {
return 200;
}
location /configuration {
client_max_body_size {{ luaConfigurationRequestBodySizecfg }};
client_body_buffer_size {{ luaConfigurationRequestBodySize $cfg }};
proxy_buffering off;
content_by_lua_block {
configuration.call()
}
}
}
下面分析 configuration.call()
的实现原理. call()
中硬编码写了各个接口的处理方法.
当 ngx.var.request_uri
为 /configuration/backends
时候, 调用 handle_backends
方法处理该路由.
handle_backends
内部实现过程很简单, 先解析 request body, 然后把读到的 body 字符串放到共享存储 configuration_data
的 backends 键里, 然后更新下操作的时间戳.
configuration_data
是一个 ngx.shared.Dict 共享内存的字典存储结构, 其 set/get 操作是并发安全的. nx.shared.dict 内部通过红黑树实现的 hashmap, 使用 lru 实现的数据淘汰.
configuration_data:set 的时候没有 cjson 解析对象, 而是直接赋值json string.
function _M.call()
if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
# 处理证书的 servers
if ngx.var.request_uri == "/configuration/servers" then
handle_servers()
return
end
# 处理通用配置 general
if ngx.var.request_uri == "/configuration/general" then
handle_general()
return
end
# 处理证书的 http handler
if ngx.var.uri == "/configuration/certs" then
handle_certs()
return
end
# 处理 backends http handler
if ngx.var.request_uri == "/configuration/backends" then
handle_backends()
return
end
ngx.status = ngx.HTTP_NOT_FOUND
ngx.print("Not found!")
end
local function handle_backends()
# 获取当前 nginx 内的 backends 配置
if ngx.var.request_method == "GET" then
ngx.status = ngx.HTTP_OK
ngx.print(_M.get_backends_data())
return
end
# 读取 request body
local backends = fetch_request_body()
if not backends then
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
# 把 backends 放到 ngx.shared 的 configuration_data 存储的 backends 键值里.
local success, err = configuration_data:set("backends", backends)
if not success then
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
# 记录更新时间
ngx.update_time()
local raw_backends_last_synced_at = ngx.time()
success, err = configuration_data:set("raw_backends_last_synced_at", raw_backends_last_synced_at)
if not success then
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
ngx.status = ngx.HTTP_CREATED
end
上面的 handle_backends
只是获取请求的 json body 字符串, 然后把字符串写到 ngx.shared.dict 存储里. 那么谁来读取 ? 谁来 json decode ?
控制器在 nginx.conf 配置文件中加入了 init_worker_by_lua_block
初始化块, 所以当 nginx 启动时会调用 balancer.init_worker
进行模块初始化.
先异步执行 sync_backends_with_external_name, 同步 service 类型为 external_name 的配置, 然后每隔一秒调用一次 sync_backends 和 sync_backends_with_external_name.
代码如下: rootfs/etc/nginx/lua/balancer.lua::init_worker()
function _M.init_worker()
# 通过定时器实现异步执行 sync_backends_with_external_name
local ok, err = ngx.timer.at(0, sync_backends_with_external_name)
if not ok then
ngx.log(ngx.ERR, "failed to create timer: ", err)
end
# 每秒调用一次 sync_backends
ok, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if not ok then
ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: ", err)
end
# 每秒调用一次 sync_backends_with_external_name
ok, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends_with_external_name)
if not ok then
ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends_with_external_name: ",
err)
end
end
sync_backends()
被定时器周期性调度, 从 ngx.shared.dict
获取 backends 数据, 反序列化后, 遍历所有的 backend 对象, 依次调用 sync_backend
来向 balancer 同步配置.
local function sync_backends()
# 从 ngx.shared.dict 获取 backends 键值数据
local backends_data = configuration.get_backends_data()
...
# 把 json string 进行反序列为 json object 对象
local new_backends, err = cjson.decode(backends_data)
...
# 通过 sync_backend() 处理 backend 对象
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
if is_backend_with_external_name(new_backend) then
...
else
# 向 balancer 同步配置
sync_backend(new_backend)
end
balancers_to_keep[new_backend.name] = true
end
end
local function sync_backend(backend)
# 如果 endpoints 为空, 则跳出.
if not backend.endpoints or #backend.endpoints == 0 then
balancers[backend.name] = nil
return
end
# 简化 endpoints 数据结构, 把复杂的 struct 转成 lua table 数组.
backend.endpoints = format_ipv6_endpoints(backend.endpoints)
local implementation = get_implementation(backend)
local balancer = balancers[backend.name]
# 该 name 没有 balancer 对象, 就创建一个
if not balancer then
balancers[backend.name] = implementation:new(backend)
return
end
# 调用 balancer 的 sync 方法, 把 backend.endpoints 更新到 peers 对象里.
balancer:sync(backend)
end
ingress-nginx
当前实现了下面几种负载均衡算法. 默认算法为 round_robin
轮询. 这些 lua 算法模块位置在 rootfs/etc/nginx
. 可以发现负载均衡算法中没有 least_conn
最少连接算法, 也没有 p2c
(自适应负载均衡算法) 算法.
为什么不能使用 nginx 自带的负载均衡算法, 而是在 lua 中实现 ? 这是因为 ingress-nginx 是通过 balancer_by_lua
实现的地址池的负载均衡. 另外 balancer_by_lua
内部没有实现健康检查, 而是通过 informer 监听 apiserver 感知地址的变更.
sticky_persistent.lua
sticky_balanced.lua
sticky.lua
round_robin.lua
resty.lua
ewma.lua
chashsubset.lua
chash.lua
为什么需要动态更新 upstream backends ?
不管是 nginx 和 openresty 都只支持配置的 reload 热加载, 不支持动态更新的. 但社区中基于 openresty 的 kong 和 apisix 都支持多源的动态更新配置, 社区中也有支持动态更新的 lua 模块可以使用.
当 nginx 作为 ingress 角色时, 遇到频繁变更 service endpoints 的场景下, nginx reload 开销不会小的, 每次都需要 new worker 及 kill old worker, 旧 worker 的长请求不断又是个问题. 新 worker 是新的子进程没法继承旧 worker 的连接池, 所以需要重新建连连接和维护 upstream 连接池, 这都会影响性能和时延 latency.
如果 nginx 不支持动态更新, 在一个大集群的的上下线会引发 ingress-nginx 不断的 reload.
在 ingress 支持 upstream 和证书动态更新后, 新配置加载的开销会小很多. 只需要把更新的配置通知给 openresty 的动态配置接口就可以了. balancer.lua 模块会维护每个 backend 地址池的负载均衡逻辑.
总结
ingress-nginx 的实现原理, 就是控制器监听 kube-apiserver 资源更新, 当有资源更新时, 通过 nginx 配置模板生成配置文件, 然后 reload 热加载的过程.
为了应对 kubernetes endpoints 变更引发的 nginx 频繁 reload, 所以 ingress-nginx 在 nginx 里使用 lua 实现了配置热更新的功能, 主要是针对地址池构建各负载算法的 balancer, 当 nginx location upstream 进行转发钱, 先从 balancer_by_lua 里获取转发的后端地址, 然后 nginx 再对该后端地址进行转发.