1 背景
最近项目中发现一个问题, 功能需求是根据label list一个CRD的CR列表. 由于数量较多list很慢, 因此使用了informer来做本地缓存(用的是这个包 “sigs.k8s.io/controller-runtime/pkg/cache”). 期望的行为是程序启动的时候加载缓存, 后面业务代码使用的时候应该能很快的返回. 但是实际使用过程中, 偶发性的会业务接口超时, 日志中提示提示这个:
理论上来说从缓存读数据应该不会这么慢, 因此需要分析下代码看看是什么原因
2 controller-runtime.cache
先来看下业务代码的调用方式:
import "sigs.k8s.io/controller-runtime/pkg/cache"
import "sigs.k8s.io/controller-runtime/pkg/client"
func New() {
// ...
nc.cache, _ = cache.New(config, cache.Options{Scheme: scheme})
go nc.cache.Start(ctx.Done())
}
func (nc *Controller) List(ns string, selector labels.Selector) {
var list v1.ObjectCRDList
nc.cache.List(context.TODO(), &list, &client.ListOptions{
LabelSelector: selector,
Namespace: ns,
})
}
主要是创建了一个cache对象, 然后调用Start启动informer, 最后通过List返回CR数据.
然后看下cache对象是如何新建的:
// sigs.k8s.io/controller-runtime/pkg/cache/cache.go
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
New会创建一个informersmap, 然后装到informerCache结构体, 作为Cache这个interface的一种具体实现返回.
而List方法实现如下:
// List implements Reader
func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...client.ListOption) error {
gvk, cacheTypeObj, err := ip.objectTypeForListObject(out)
if err != nil {
return err
}
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
if err != nil {
return err
}
if !started {
return &ErrCacheNotStarted{}
}
return cache.Reader.List(ctx, out, opts...)
}
首先获取要list的对象的GVK信息并取得要存数据的内存, 我们要List的对象是一个CRD, 属于structured, 这时候需要从创建对象时传入的scheme中获取GVK, 这也是为啥需要传入schema的原因.
接下来从informersMap中通过gvk获取该类型的informer:
// sigs.k8s.io/controller-runtime/pkg/cache/internal/deleg_map.go
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList
if isUnstructured {
return m.unstructured.Get(ctx, gvk, obj)
}
return m.structured.Get(ctx, gvk, obj)
}
// sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
return i, ip.started, ok
}()
if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
return started, nil, err
}
}
if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}
return started, i, nil
}
通过代码文件的名字deleg_map可以看出, 这里大概是用了装饰器设计模式, InformerMap结构体包装了unstructured和structured两个具体的 specificInformersMap, 业务方调用的各个函数, 最终根据参数类型转发给 unstructured和structured二者之一.
从Get这个函数就可以看出来偶发性的卡顿的大概原因, 当specificInformersMap中找不到对应类型的informer, 此时会调用addInformerToMap创建一个新的informer, 然后等待informer同步数据, 因此会出现卡顿十几秒的情况. 这里就引申出来一个问题, 难道说指定了schem创建了cache对象之后, 并没有将其中的各个对象创建对应的informer, 而是在首次访问时创建.
可以看到前文中cache的New方法, 仅仅是创建了结构体, 并没有什么操作. 而业务代码中调用了cache.Start, 看看Start函数中如何启动informer的:
// sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go
// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
func() {
ip.mu.Lock()
defer ip.mu.Unlock()
// Set the stop channel so it can be passed to informers that are added later
ip.stop = stop
// Start each informer
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(stop)
}
// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()
<-stop
}
上面两个函数一个是创建specificInformersMap的方法, 一个是启动informer的方法, 可以看到创建时初始化里informersByGVK, 并没有向里面填充scheme的内容, 而Start时是遍历informersByGVK并启动, 也没有添加informer的动作. 因此得出结论, 创建cache并调用Start方法, 此时cache并没有任何的informer真正被启动, 知道cache对象首次获取数据时才会真正添加并启动informer, 这里会导致程序启动时没有初始化好数据, 直到首次对外提供数据服务时才初始化数据, 会导致用户首次访问时延时增大甚至超时
3 解决方法
程序初始化过程中可以加一个GetInformer方法注册并启动对应CRD的informer:
func New() {
// ...
nc.cache, _ = cache.New(config, cache.Options{Scheme: scheme})
go nc.cache.Start(ctx.Done())
_, _ := nc.cache.GetInformer(ctx, v1.ObjectCRD{})
}