k8s apiserver源码

Posted by [Kohn] on Wednesday, April 10, 2024
Last Modified on Tuesday, April 23, 2024
本文阅读量

1 源码中的几个概念

RESTStorage

用于维护restful API的url与后端etcd的key的映射关系

Versioner

Versioner是一个用来更新对象ResourceVersion的interface{}, 这里只负责写入对象结构体内的resource version字段

Versioner的实现在api_object_versioner.go

2 apiserver初始化过程

kubernetes/cmd/kube-apiserver/app/server.go

  1. CreateServerChain初始化KubeApiServer/APIExtensionServer/AggregatorServer
  2. 启动server

CreateServerChain

创建三个server的config及对象, 以KubeAPIServer为例, 配置一系列的RESTStorageProvider

restStorageProviders := []RESTStorageProvider{
              authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
              authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
              autoscalingrest.RESTStorageProvider{},
              batchrest.RESTStorageProvider{},
              certificatesrest.RESTStorageProvider{},
              coordinationrest.RESTStorageProvider{},
              discoveryrest.StorageProvider{},
              extensionsrest.RESTStorageProvider{},
              networkingrest.RESTStorageProvider{},
              noderest.RESTStorageProvider{},
              policyrest.RESTStorageProvider{},
              rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
              schedulingrest.RESTStorageProvider{},
              settingsrest.RESTStorageProvider{},
              storagerest.RESTStorageProvider{},
              flowcontrolrest.RESTStorageProvider{},
              // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
              // See https://github.com/kubernetes/kubernetes/issues/42392
              appsrest.StorageProvider{},
              admissionregistrationrest.RESTStorageProvider{},
              eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
      }

然后调用逐个调用这些Provider的NewRestStorage方法, 获得一个APIGroupInfo对象, 最后调用generateAPIServer的InstallAPIGroups方法

这里创建APIGroupInfo对象时, 不同的Provider会根据配置文件生命启用哪个GroupVersion来添加对应的Storage对象

// NewRESTStorage returns APIGroupInfo object.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
        apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
        // 必须得启用某几个版本, 否则VersionedResourcesStorageMap里是空的
        if apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {
                storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)
                if err != nil {
                        return genericapiserver.APIGroupInfo{}, false, err
                }
                apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
        }

        return apiGroupInfo, true, nil
}

func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
        storage := map[string]rest.Storage{}

        // deployments
        deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
        if err != nil {
                return storage, err
        }
        storage["deployments"] = deploymentStorage.Deployment
        storage["deployments/status"] = deploymentStorage.Status
        storage["deployments/scale"] = deploymentStorage.Scale

        // ...
}

这里每个Storage(例如deploymentStorage)中定义了Get/Create等方法, 一般的 实现是引用genericregistry.Store对象(例如deploymentStorage.Deployment), 这个 store对象根据配置文件进行初始化, 一般会使用etcd3, 实现了通用的 list/get/watch等接口. 然后Get/Create等方法调用store进行读写, 本质上就是 etcd的读写. 也可以自定义一个store(例如deploymentStorage.Status, 只实现了Get等方法)

// kubernetes/pkg/registry/apps/deployment/storage/storage.go
// NewStorage returns new instance of DeploymentStorage.
func NewStorage(optsGetter generic.RESTOptionsGetter) (DeploymentStorage, error) {
        deploymentRest, deploymentStatusRest, deploymentRollbackRest, err := NewREST(optsGetter)
        if err != nil {
                return DeploymentStorage{}, err
        }

        return DeploymentStorage{
                Deployment: deploymentRest,
                Status:     deploymentStatusRest,
                Scale:      &ScaleREST{store: deploymentRest.Store},
                Rollback:   deploymentRollbackRest,
        }, nil
}

// REST implements a RESTStorage for Deployments.
type REST struct {
        *genericregistry.Store
        categories []string
}

// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
        store := &genericregistry.Store{
                NewFunc:                  func() runtime.Object { return &apps.Deployment{} },
                NewListFunc:              func() runtime.Object { return &apps.DeploymentList{} },
                DefaultQualifiedResource: apps.Resource("deployments"),

                CreateStrategy: deployment.Strategy,
                UpdateStrategy: deployment.Strategy,
                DeleteStrategy: deployment.Strategy,

                TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
        }
        options := &generic.StoreOptions{RESTOptions: optsGetter}
        // 根据配置文件初始化store
        if err := store.CompleteWithOptions(options); err != nil {
                return nil, nil, nil, err
        }

        statusStore := *store
        statusStore.UpdateStrategy = deployment.StatusStrategy
        return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}

InstallAPIGroups遍历每一个APIGroup, 调用链: installAPIResources->InstallREST->installer.Install()->registerResourceHandlers

registerResourceHandlers负责根据APIGroupInfo生成一个url path, 然后注册 handler. 注册handler时, 会检查Store是否实现了Get/Post/Patch/Delete等方 法(通过判断是否属于对应的interface), 如果实现了, 则注册对应的方法:

// Handler for standard REST verbs (GET, PUT, POST and DELETE).
// Add actions at the resource path: /api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

// Add actions at the item path: /api/apiVersion/resource/{name}
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
        actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}

// 最后统一注册actions

注册handler时需要METHOD+URL+handler, METHOD和url path已经在action里了, handler则是根据method的不同而不同, 但是也是在generateapiserver里定义好 了:

// registerResourceHandlers函数
switch action.Verb {
case "GET": // Get a resource.
        var handler restful.RouteFunction
        if isGetterWithOptions {
                handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
        } else {
                handler = restfulGetResource(getter, exporter, reqScope)
        }

        if needOverride {
                // need change the reported verb
                handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
        } else {
                handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
        }
        if enableWarningHeaders {
                handler = utilwarning.AddWarningsHandler(handler, warnings)
        }

        doc := "read the specified " + kind
        if isSubresource {
                doc = "read " + subresource + " of the specified " + kind
        }
        route := ws.GET(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                Writes(producedObject)
        if isGetterWithOptions {
                if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
                        return nil, err
                }
        }
        if isExporter {
                if err := AddObjectParams(ws, route, versionedExportOptions); err != nil {
                        return nil, err
                }
        }
        addParams(route, action.Params)
        routes = append(routes, route)

以GET为例, restfulGetResource最终会调用storage提供的Get方法, 也就通过etcd读到数据了

3 resource version相关

所有对象的metadata中都包含resourceversion和generation两个版本相关的字段, 其中generation比较好理解, 就是每次更新对象时generation会自增1.

resourceversion则是etcd中的modrevision. etcd中又有revision, modrevision, createrevision, version等几个概念.

scope explan
revision global 全局自增id
modrevision for key 某个key在修改时那一刻的revision
createrevision for key 某个key创建时可的revision
version for key 某个key每次更新时自增一个版本

更新对象时, 从etcd获取对象之前的resourceversion.

4 genericregistry.Store

genericregistry.Store用于apiserver操作底层存储也就是etcd. 使用是需要针对对象提供对应的New函数, NewList函数等, 例如针对deployment的Store, 是这么初始化:

// pkg/registry/apps/deployment/storage/storage.go
store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &apps.Deployment{} },
        NewListFunc:              func() runtime.Object { return &apps.DeploymentList{} },
        DefaultQualifiedResource: apps.Resource("deployments"),

        CreateStrategy: deployment.Strategy,
        UpdateStrategy: deployment.Strategy,
        DeleteStrategy: deployment.Strategy,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}

// 然后提供options完成初始化
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
        return nil, nil, nil, err
}

genericregistry.Store除了包含针对某个资源类型的各个函数之外, 还包含了 一个Storage对象, 通过针对资源类型的各个函数得到KV存储中的key之后, 通过 Storage完成具体的读写动作.

storage的初始化在store.CompleteWithOptions函数中

if e.Storage.Storage == nil {
        e.Storage.Codec = opts.StorageConfig.Codec
        var err error
        e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
                opts.StorageConfig,
                prefix,
                keyFunc,
                e.NewFunc,
                e.NewListFunc,
                attrFunc,
                options.TriggerFunc,
                options.Indexers,
        )
        if err != nil {
                return err
        }
        e.StorageVersioner = opts.StorageConfig.EncodeVersioner

        if opts.CountMetricPollPeriod > 0 {
                stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
                previousDestroy := e.DestroyFunc
                e.DestroyFunc = func() {
                        stopFunc()
                        if previousDestroy != nil {
                                previousDestroy()
                        }
                }
        }
}

opts.Decorator根据启动参数选择不同的Decorator

CreateKubeAPIServerConfig->buildGenericConfig->s.Etcd.ApplyWithStorageFactoryTo

func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
        if err := s.addEtcdHealthEndpoint(c); err != nil {
                return err
        }
        c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
        return nil
}

type StorageFactoryRestOptionsFactory struct {
        Options        EtcdOptions
        StorageFactory serverstorage.StorageFactory
}

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
        storageConfig, err := f.StorageFactory.NewConfig(resource)
        if err != nil {
                return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
        }

        ret := generic.RESTOptions{
                StorageConfig:           storageConfig,
                Decorator:               generic.UndecoratedStorage,
                DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
                EnableGarbageCollection: f.Options.EnableGarbageCollection,
                ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
                CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
        }
        // 如果配置了EnableWatchCache, 设置Decorator = genericregistry.StorageWithCacher()
        if f.Options.EnableWatchCache {
                sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
                if err != nil {
                        return generic.RESTOptions{}, err
                }
                size, ok := sizes[resource]
                if ok && size > 0 {
                        klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
                }
                if ok && size <= 0 {
                        ret.Decorator = generic.UndecoratedStorage
                } else {
                        ret.Decorator = genericregistry.StorageWithCacher()
                }
        }

        return ret, nil
}

其中创建了genericregistry.StorageWithCacher()

import (
        ...
        cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
        ...
)

// Creates a cacher based given storageConfig.
func StorageWithCacher() generic.StorageDecorator {
        return func(
                storageConfig *storagebackend.Config,
                resourcePrefix string,
                keyFunc func(obj runtime.Object) (string, error),
                newFunc func() runtime.Object,
                newListFunc func() runtime.Object,
                getAttrsFunc storage.AttrFunc,
                triggerFuncs storage.IndexerFuncs,
                indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {

                s, d, err := generic.NewRawStorage(storageConfig)
                if err != nil {
                        return s, d, err
                }
                if klog.V(5).Enabled() {
                        klog.Infof("Storage caching is enabled for %s", objectTypeToString(newFunc()))
                }

                cacherConfig := cacherstorage.Config{
                        Storage:        s,
                        Versioner:      etcd3.APIObjectVersioner{},
                        ResourcePrefix: resourcePrefix,
                        KeyFunc:        keyFunc,
                        NewFunc:        newFunc,
                        NewListFunc:    newListFunc,
                        GetAttrsFunc:   getAttrsFunc,
                        IndexerFuncs:   triggerFuncs,
                        Indexers:       indexers,
                        Codec:          storageConfig.Codec,
                }
                cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
                if err != nil {
                        return nil, func() {}, err
                }
                destroyFunc := func() {
                        cacher.Stop()
                        d()
                }

                // TODO : Remove RegisterStorageCleanup below when PR
                // https://github.com/kubernetes/kubernetes/pull/50690
                // merges as that shuts down storage properly
                RegisterStorageCleanup(destroyFunc)

                return cacher, destroyFunc, nil
        }
}

NewCacherFromConfig

k8s.io/apiserver/pkg/storage/cacher/cacher.go

// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config Config) (*Cacher, error) {
        // ...
        objType := reflect.TypeOf(obj)
        cacher := &Cacher{
                ready:          newReady(),
                storage:        config.Storage,
                objectType:     objType,
                versioner:      config.Versioner,
                newFunc:        config.NewFunc,
                indexedTrigger: indexedTrigger,
                watcherIdx:     0,
                watchers: indexedWatchers{
                        allWatchers:   make(map[int]*cacheWatcher),
                        valueWatchers: make(map[string]watchersMap),
                },
                // TODO: Figure out the correct value for the buffer size.
                incoming:              make(chan watchCacheEvent, 100),
                dispatchTimeoutBudget: newTimeBudget(stopCh),
                // We need to (potentially) stop both:
                // - wait.Until go-routine
                // - reflector.ListAndWatch
                // and there are no guarantees on the order that they will stop.
                // So we will be simply closing the channel, and synchronizing on the WaitGroup.
                stopCh:           stopCh,
                clock:            config.Clock,
                timer:            time.NewTimer(time.Duration(0)),
                bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
        }

        // ...

        // 缓存对象, 实现了Store接口(Add/Delete等), 处理对应的watch事件
        watchCache := newWatchCache(
                config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
        // 实现了List/Watch接口, 本质上是一个etcd的client
        listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
        reflectorName := "storage/cacher.go:" + config.ResourcePrefix

        // 包装了watchCache和listerWater
        reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
        // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
        // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
        reflector.WatchListPageSize = storageWatchListPageSize

        cacher.watchCache = watchCache
        cacher.reflector = reflector

        // 启动事件分发器, 该函数会不断从cacher.incoming这个chan中读取事件
        go cacher.dispatchEvents()

        cacher.stopWg.Add(1)
        go func() {
                defer cacher.stopWg.Done()
                defer cacher.terminateAllWatchers()
                wait.Until(
                        func() {
                                if !cacher.isStopped() {
                                        cacher.startCaching(stopCh)
                                }
                        }, time.Second, stopCh,
                        // 每隔一秒调用一次startCache
                )
        }()

        return cacher, nil
}

核心逻辑是startCache, 这个函数本身是阻塞的, 本来调用一次即可, 但是可能会遇到出错的情况, 因此需要放在wait.Until里

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
        // ...

        // Note that since onReplace may be not called due to errors, we explicitly
        // need to retry it on errors under lock.
        // Also note that startCaching is called in a loop, so there's no need
        // to have another loop here.
        if err := c.reflector.ListAndWatch(stopChannel); err != nil {
                klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
        }
}

WatchCache

WatchCache内部维护了一个循环队列存储数据, 包含 startIndex/endIndex/capacity三个变量startIndex表示循环队列的起始地址, endIndex是结束地址, 这两个值都可能大于capacity, 寻址时需要取模, 即 cache[startIndex%capacity]刚开始时startIndex=endIndex=0, capacity=100,

  1. 每次写缓存时, 首先对cache扩缩容
    • 如果缓存已满并且缓存内所有的数据都是75秒内写入的, 需要扩容到2倍大小(有上限)
    • 如果缓存已满并且75秒写入的只有1/4, 那就缩容一半(有下线)
  2. 如果扩容完之后还是满的(可能是因为到了cache上限), 此时会丢弃最老的那个对象, 只需要startIndex加1即可
  3. 此时缓存还有位置, 将新的对象放到cache[endIndex%capacity], 并且endIndex加1
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(event *watchCacheEvent) {
        w.resizeCacheLocked(event.RecordTime)
        if w.isCacheFullLocked() {
                // Cache is full - remove the oldest element.
                w.startIndex++
        }
        w.cache[w.endIndex%w.capacity] = event
        w.endIndex++
}

// resizeCacheLocked resizes the cache if necessary:
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
        if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
                capacity := min(w.capacity*2, w.upperBoundCapacity)
                if capacity > w.capacity {
                        w.doCacheResizeLocked(capacity)
                }
                return
        }
        if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
                capacity := max(w.capacity/2, w.lowerBoundCapacity)
                if capacity < w.capacity {
                        w.doCacheResizeLocked(capacity)
                }
                return
        }
}

Watch

etcd默认保留5分钟以内的变更记录,每个资源发生变更都会更新一个更大的资 源版本ResourceVersion,ResourceVersion是一个所有资源类型共享的全局变量。

向etcd watch资源时:

  1. 如果不指定resourceversion, 则从最新的开始watch
  2. 如果指定了resourceversion
    • 如果resourceversion小于etcd保留的最小版本, 返回410(Gone)
    • 如果大于etcd最新版本, 则等待, 直到等到或者超时

了解基础概念后, 接下来看Watch的代码

// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
        watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
        // ...

        // 等待cache加载完成
        c.ready.wait()

        // ...

        // Determine watch timeout('0' means deadline is not set, ignore checking)
        deadline, _ := ctx.Deadline()
        // Create a watcher here to reduce memory allocations under lock,
        // given that memory allocation may trigger GC and block the thread.
        // Also note that emptyFunc is a placeholder, until we will be able
        // to compute watcher.forget function (which has to happen under lock).
        watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)


        // We explicitly use thread unsafe version and do locking ourself to ensure that
        // no new events will be processed in the meantime. The watchCache will be unlocked
        // on return from this function.
        // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
        // underlying watchCache is calling processEvent under its lock.
        // GetAllEventsSinceThreadUnsafe会将缓存内的所有数据(指定的ResourceVersion之后的数据)以Add事件的形式聚合
        c.watchCache.RLock()
        defer c.watchCache.RUnlock()
        initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
        // ...

        // With some events already sent, update resourceVersion so that
        // events that were buffered and not yet processed won't be delivered
        // to this watcher second time causing going back in time.
        // initEvents[len(initEvents)-1]是缓存中最新的数据
        if len(initEvents) > 0 {
                watchRV = initEvents[len(initEvents)-1].ResourceVersion
        }

        func() {
                c.Lock()
                defer c.Unlock()
                // Update watcher.forget function once we can compute it.
                watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
                c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)

                // Add it to the queue only when the client support watch bookmarks.
                if watcher.allowWatchBookmarks {
                        c.bookmarkWatchers.addWatcher(watcher)
                }
                c.watcherIdx++
        }()

        go watcher.process(ctx, initEvents, watchRV)
        return watcher, nil
}

Watch接口会返回一个watch.Interface, 其中有一个ResultChan的方法, 调用watch的一方可以从这个ResultChan获取事件

这里使用的是cacheWatcher做为watch.Interface的具体实现.

watch的具体过程(k8s.io/apiserver/pkg/storage/cacher/cacher.go)

  1. Cacher创建了一个WatchCache对象, 并传入自己的processEvent函数做为handler
  2. WatchCache对象会去真正的watch etcd, watch到事件后调用handler(也就是Cacher的processEvent)
  3. processEvent将事件放到Cacher的incoming channel
  4. Cacher初始化时还启动了一个goroutine不断从incoming channel读事件
  5. Cacher另外维护了一个watcher的队列, 每当有一个消费者调用Cacher的Watch方法时就创建一个watcher放到Cacher的watcher队列
  6. Cacher从incoming channel读到的事件后调用每个watcher的add/nonblockingAdd, 这里还比较细节, 总之就是放到watcher内部的input channel
  7. Cacher创建watcher时会调用它的process携程, process方法从input channle读数据, 如果读到的数据的resource version大于用户指定的 ResourceVersion, 则将数据放到ResultCahn中
  8. Watch的调用方从ResultChan读取事件

List

// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
        resourceVersion := opts.ResourceVersion
        pred := opts.Predicate
        pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
        hasContinuation := pagingEnabled && len(pred.Continue) > 0
        hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
        if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
                // If resourceVersion is not specified, serve it from underlying
                // storage (for backward compatibility). If a continuation is
                // requested, serve it from the underlying storage as well.
                // Limits are only sent to storage when resourceVersion is non-zero
                // since the watch cache isn't able to perform continuations, and
                // limits are ignored when resource version is zero.
                return c.storage.List(ctx, key, opts, listObj)
        }

        // ...

        // 缓存没有ready
        if listRV == 0 && !c.ready.check() {
                // If Cacher is not yet initialized and we don't require any specific
                // minimal resource version, simply forward the request to storage.
                return c.storage.List(ctx, key, opts, listObj)
        }

        // ...

        // 获取数据
        objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
        if err != nil {
                return err
        }

        // ...
        // 过滤数据, 可以看到是先从缓存获取全部数据, 在一个个过滤
        for _, obj := range objs {
                elem, ok := obj.(*storeElement)
                if !ok {
                        return fmt.Errorf("non *storeElement returned from storage: %v", obj)
                }
                if filter(elem.Key, elem.Labels, elem.Fields) {
                        listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
                }
        }
}

List时不一定就直接从缓存里获取数据

  1. 参数问题: 如果没有指定resourceVersion, 或者指定了非零的 resourceversion同时指定了limit, 或者指定了continue, 将会导致直接从 etcd获取数据. 这里resourceVersion的语义:
    1. 不设置: 取最新版本
    2. 设置0: 取任意版本(缓存里有什么就给什么)
    3. 非零: 比给定版本新的任意版本
  2. 设置了resourceVersion=0, 但是缓存还没ready, 将会直接从etcd拉取o

WaitUntilFreshAndList

  1. 获取数据时, 检查请求的resourceversion与cache内的resourceversion, 如 果大于cache内的, 那需要等待直到cache内缓存到指定版本的数据为止.