informer中list对象偶发性卡顿原因分析

Posted by [Kohn] on Monday, May 15, 2023 本文阅读量

1 背景

最近项目中发现一个问题, 功能需求是根据label list一个CRD的CR列表. 由于数量较多list很慢, 因此使用了informer来做本地缓存(用的是这个包 “sigs.k8s.io/controller-runtime/pkg/cache”). 期望的行为是程序启动的时候加载缓存, 后面业务代码使用的时候应该能很快的返回. 但是实际使用过程中, 偶发性的会业务接口超时, 日志中提示提示这个:

<span class="figure-number">Figure 1: </span>20230515_114908

<span class="figure-number">Figure 1: </span>20230515_114908

理论上来说从缓存读数据应该不会这么慢, 因此需要分析下代码看看是什么原因

2 controller-runtime.cache

先来看下业务代码的调用方式:

import 	"sigs.k8s.io/controller-runtime/pkg/cache"
import 	"sigs.k8s.io/controller-runtime/pkg/client"

func New() {
        // ...
        nc.cache, _ = cache.New(config, cache.Options{Scheme: scheme})
        go nc.cache.Start(ctx.Done())
}

func (nc *Controller) List(ns string, selector labels.Selector) {
        var list v1.ObjectCRDList
        nc.cache.List(context.TODO(), &list, &client.ListOptions{
                LabelSelector: selector,
                Namespace:     ns,
        })
}

主要是创建了一个cache对象, 然后调用Start启动informer, 最后通过List返回CR数据.

然后看下cache对象是如何新建的:

// sigs.k8s.io/controller-runtime/pkg/cache/cache.go

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
        opts, err := defaultOpts(config, opts)
        if err != nil {
                return nil, err
        }
        im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
        return &informerCache{InformersMap: im}, nil
}

New会创建一个informersmap, 然后装到informerCache结构体, 作为Cache这个interface的一种具体实现返回.

而List方法实现如下:

// List implements Reader
func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...client.ListOption) error {

        gvk, cacheTypeObj, err := ip.objectTypeForListObject(out)
        if err != nil {
                return err
        }

        started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
        if err != nil {
                return err
        }

        if !started {
                return &ErrCacheNotStarted{}
        }

        return cache.Reader.List(ctx, out, opts...)
}

首先获取要list的对象的GVK信息并取得要存数据的内存, 我们要List的对象是一个CRD, 属于structured, 这时候需要从创建对象时传入的scheme中获取GVK, 这也是为啥需要传入schema的原因.

接下来从informersMap中通过gvk获取该类型的informer:

// sigs.k8s.io/controller-runtime/pkg/cache/internal/deleg_map.go

// Get will create a new Informer and add it to the map of InformersMap if none exists.  Returns
// the Informer from the map.
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
        _, isUnstructured := obj.(*unstructured.Unstructured)
        _, isUnstructuredList := obj.(*unstructured.UnstructuredList)
        isUnstructured = isUnstructured || isUnstructuredList

        if isUnstructured {
                return m.unstructured.Get(ctx, gvk, obj)
        }

        return m.structured.Get(ctx, gvk, obj)
}

// sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go

// Get will create a new Informer and add it to the map of specificInformersMap if none exists.  Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
        // Return the informer if it is found
        i, started, ok := func() (*MapEntry, bool, bool) {
                ip.mu.RLock()
                defer ip.mu.RUnlock()
                i, ok := ip.informersByGVK[gvk]
                return i, ip.started, ok
        }()

        if !ok {
                var err error
                if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
                        return started, nil, err
                }
        }

        if started && !i.Informer.HasSynced() {
                // Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
                if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
                        return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
                }
        }

        return started, i, nil
}

通过代码文件的名字deleg_map可以看出, 这里大概是用了装饰器设计模式, InformerMap结构体包装了unstructured和structured两个具体的 specificInformersMap, 业务方调用的各个函数, 最终根据参数类型转发给 unstructured和structured二者之一.

从Get这个函数就可以看出来偶发性的卡顿的大概原因, 当specificInformersMap中找不到对应类型的informer, 此时会调用addInformerToMap创建一个新的informer, 然后等待informer同步数据, 因此会出现卡顿十几秒的情况. 这里就引申出来一个问题, 难道说指定了schem创建了cache对象之后, 并没有将其中的各个对象创建对应的informer, 而是在首次访问时创建.

可以看到前文中cache的New方法, 仅仅是创建了结构体, 并没有什么操作. 而业务代码中调用了cache.Start, 看看Start函数中如何启动informer的:

// sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go

// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(config *rest.Config,
        scheme *runtime.Scheme,
        mapper meta.RESTMapper,
        resync time.Duration,
        namespace string,
        createListWatcher createListWatcherFunc) *specificInformersMap {
        ip := &specificInformersMap{
                config:            config,
                Scheme:            scheme,
                mapper:            mapper,
                informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
                codecs:            serializer.NewCodecFactory(scheme),
                paramCodec:        runtime.NewParameterCodec(scheme),
                resync:            resync,
                startWait:         make(chan struct{}),
                createListWatcher: createListWatcher,
                namespace:         namespace,
        }
        return ip
}

// Start calls Run on each of the informers and sets started to true.  Blocks on the stop channel.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
        func() {
                ip.mu.Lock()
                defer ip.mu.Unlock()

                // Set the stop channel so it can be passed to informers that are added later
                ip.stop = stop

                // Start each informer
                for _, informer := range ip.informersByGVK {
                        go informer.Informer.Run(stop)
                }

                // Set started to true so we immediately start any informers added later.
                ip.started = true
                close(ip.startWait)
        }()
        <-stop
}

上面两个函数一个是创建specificInformersMap的方法, 一个是启动informer的方法, 可以看到创建时初始化里informersByGVK, 并没有向里面填充scheme的内容, 而Start时是遍历informersByGVK并启动, 也没有添加informer的动作. 因此得出结论, 创建cache并调用Start方法, 此时cache并没有任何的informer真正被启动, 知道cache对象首次获取数据时才会真正添加并启动informer, 这里会导致程序启动时没有初始化好数据, 直到首次对外提供数据服务时才初始化数据, 会导致用户首次访问时延时增大甚至超时

3 解决方法

程序初始化过程中可以加一个GetInformer方法注册并启动对应CRD的informer:

func New() {
        // ...
        nc.cache, _ = cache.New(config, cache.Options{Scheme: scheme})
        go nc.cache.Start(ctx.Done())
        _, _ := nc.cache.GetInformer(ctx, v1.ObjectCRD{})
}