1 源码中的几个概念
RESTStorage
用于维护restful API的url与后端etcd的key的映射关系
Versioner
Versioner是一个用来更新对象ResourceVersion的interface{}, 这里只负责写入对象结构体内的resource version字段
Versioner的实现在api_object_versioner.go
2 apiserver初始化过程
kubernetes/cmd/kube-apiserver/app/server.go
- CreateServerChain初始化KubeApiServer/APIExtensionServer/AggregatorServer
- 启动server
CreateServerChain
创建三个server的config及对象, 以KubeAPIServer为例, 配置一系列的RESTStorageProvider
restStorageProviders := []RESTStorageProvider{
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
然后调用逐个调用这些Provider的NewRestStorage方法, 获得一个APIGroupInfo对象, 最后调用generateAPIServer的InstallAPIGroups方法
这里创建APIGroupInfo对象时, 不同的Provider会根据配置文件生命启用哪个GroupVersion来添加对应的Storage对象
// NewRESTStorage returns APIGroupInfo object.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
// 必须得启用某几个版本, 否则VersionedResourcesStorageMap里是空的
if apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {
storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, false, err
}
apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
}
return apiGroupInfo, true, nil
}
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
storage := map[string]rest.Storage{}
// deployments
deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
if err != nil {
return storage, err
}
storage["deployments"] = deploymentStorage.Deployment
storage["deployments/status"] = deploymentStorage.Status
storage["deployments/scale"] = deploymentStorage.Scale
// ...
}
这里每个Storage(例如deploymentStorage)中定义了Get/Create等方法, 一般的 实现是引用genericregistry.Store对象(例如deploymentStorage.Deployment), 这个 store对象根据配置文件进行初始化, 一般会使用etcd3, 实现了通用的 list/get/watch等接口. 然后Get/Create等方法调用store进行读写, 本质上就是 etcd的读写. 也可以自定义一个store(例如deploymentStorage.Status, 只实现了Get等方法)
// kubernetes/pkg/registry/apps/deployment/storage/storage.go
// NewStorage returns new instance of DeploymentStorage.
func NewStorage(optsGetter generic.RESTOptionsGetter) (DeploymentStorage, error) {
deploymentRest, deploymentStatusRest, deploymentRollbackRest, err := NewREST(optsGetter)
if err != nil {
return DeploymentStorage{}, err
}
return DeploymentStorage{
Deployment: deploymentRest,
Status: deploymentStatusRest,
Scale: &ScaleREST{store: deploymentRest.Store},
Rollback: deploymentRollbackRest,
}, nil
}
// REST implements a RESTStorage for Deployments.
type REST struct {
*genericregistry.Store
categories []string
}
// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apps.Deployment{} },
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },
DefaultQualifiedResource: apps.Resource("deployments"),
CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
// 根据配置文件初始化store
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}
statusStore := *store
statusStore.UpdateStrategy = deployment.StatusStrategy
return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}
InstallAPIGroups遍历每一个APIGroup, 调用链: installAPIResources->InstallREST->installer.Install()->registerResourceHandlers
registerResourceHandlers负责根据APIGroupInfo生成一个url path, 然后注册 handler. 注册handler时, 会检查Store是否实现了Get/Post/Patch/Delete等方 法(通过判断是否属于对应的interface), 如果实现了, 则注册对应的方法:
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
// Add actions at the resource path: /api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
// Add actions at the item path: /api/apiVersion/resource/{name}
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
// 最后统一注册actions
注册handler时需要METHOD+URL+handler, METHOD和url path已经在action里了, handler则是根据method的不同而不同, 但是也是在generateapiserver里定义好 了:
// registerResourceHandlers函数
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, exporter, reqScope)
}
if needOverride {
// need change the reported verb
handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
} else {
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
}
if enableWarningHeaders {
handler = utilwarning.AddWarningsHandler(handler, warnings)
}
doc := "read the specified " + kind
if isSubresource {
doc = "read " + subresource + " of the specified " + kind
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
if isGetterWithOptions {
if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, err
}
}
if isExporter {
if err := AddObjectParams(ws, route, versionedExportOptions); err != nil {
return nil, err
}
}
addParams(route, action.Params)
routes = append(routes, route)
以GET为例, restfulGetResource最终会调用storage提供的Get方法, 也就通过etcd读到数据了
3 resource version相关
所有对象的metadata中都包含resourceversion和generation两个版本相关的字段, 其中generation比较好理解, 就是每次更新对象时generation会自增1.
resourceversion则是etcd中的modrevision. etcd中又有revision, modrevision, createrevision, version等几个概念.
scope | explan | |
---|---|---|
revision | global | 全局自增id |
modrevision | for key | 某个key在修改时那一刻的revision |
createrevision | for key | 某个key创建时可的revision |
version | for key | 某个key每次更新时自增一个版本 |
更新对象时, 从etcd获取对象之前的resourceversion.
4 genericregistry.Store
genericregistry.Store用于apiserver操作底层存储也就是etcd. 使用是需要针对对象提供对应的New函数, NewList函数等, 例如针对deployment的Store, 是这么初始化:
// pkg/registry/apps/deployment/storage/storage.go
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apps.Deployment{} },
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },
DefaultQualifiedResource: apps.Resource("deployments"),
CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
// 然后提供options完成初始化
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}
genericregistry.Store除了包含针对某个资源类型的各个函数之外, 还包含了 一个Storage对象, 通过针对资源类型的各个函数得到KV存储中的key之后, 通过 Storage完成具体的读写动作.
storage的初始化在store.CompleteWithOptions函数中
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
var err error
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
if err != nil {
return err
}
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
}
}
}
opts.Decorator根据启动参数选择不同的Decorator
CreateKubeAPIServerConfig->buildGenericConfig->s.Etcd.ApplyWithStorageFactoryTo
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
}
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
// 如果配置了EnableWatchCache, 设置Decorator = genericregistry.StorageWithCacher()
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
size, ok := sizes[resource]
if ok && size > 0 {
klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
}
if ok && size <= 0 {
ret.Decorator = generic.UndecoratedStorage
} else {
ret.Decorator = genericregistry.StorageWithCacher()
}
}
return ret, nil
}
其中创建了genericregistry.StorageWithCacher()
import (
...
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
...
)
// Creates a cacher based given storageConfig.
func StorageWithCacher() generic.StorageDecorator {
return func(
storageConfig *storagebackend.Config,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig)
if err != nil {
return s, d, err
}
if klog.V(5).Enabled() {
klog.Infof("Storage caching is enabled for %s", objectTypeToString(newFunc()))
}
cacherConfig := cacherstorage.Config{
Storage: s,
Versioner: etcd3.APIObjectVersioner{},
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
IndexerFuncs: triggerFuncs,
Indexers: indexers,
Codec: storageConfig.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
if err != nil {
return nil, func() {}, err
}
destroyFunc := func() {
cacher.Stop()
d()
}
// TODO : Remove RegisterStorageCleanup below when PR
// https://github.com/kubernetes/kubernetes/pull/50690
// merges as that shuts down storage properly
RegisterStorageCleanup(destroyFunc)
return cacher, destroyFunc, nil
}
}
NewCacherFromConfig
k8s.io/apiserver/pkg/storage/cacher/cacher.go
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config Config) (*Cacher, error) {
// ...
objType := reflect.TypeOf(obj)
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
objectType: objType,
versioner: config.Versioner,
newFunc: config.NewFunc,
indexedTrigger: indexedTrigger,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
dispatchTimeoutBudget: newTimeBudget(stopCh),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
clock: config.Clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
}
// ...
// 缓存对象, 实现了Store接口(Add/Delete等), 处理对应的watch事件
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
// 实现了List/Watch接口, 本质上是一个etcd的client
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
// 包装了watchCache和listerWater
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize
cacher.watchCache = watchCache
cacher.reflector = reflector
// 启动事件分发器, 该函数会不断从cacher.incoming这个chan中读取事件
go cacher.dispatchEvents()
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, time.Second, stopCh,
// 每隔一秒调用一次startCache
)
}()
return cacher, nil
}
核心逻辑是startCache, 这个函数本身是阻塞的, 本来调用一次即可, 但是可能会遇到出错的情况, 因此需要放在wait.Until里
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// ...
// Note that since onReplace may be not called due to errors, we explicitly
// need to retry it on errors under lock.
// Also note that startCaching is called in a loop, so there's no need
// to have another loop here.
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
}
}
WatchCache
WatchCache内部维护了一个循环队列存储数据, 包含 startIndex/endIndex/capacity三个变量startIndex表示循环队列的起始地址, endIndex是结束地址, 这两个值都可能大于capacity, 寻址时需要取模, 即 cache[startIndex%capacity]刚开始时startIndex=endIndex=0, capacity=100,
- 每次写缓存时, 首先对cache扩缩容
- 如果缓存已满并且缓存内所有的数据都是75秒内写入的, 需要扩容到2倍大小(有上限)
- 如果缓存已满并且75秒写入的只有1/4, 那就缩容一半(有下线)
- 如果扩容完之后还是满的(可能是因为到了cache上限), 此时会丢弃最老的那个对象, 只需要startIndex加1即可
- 此时缓存还有位置, 将新的对象放到cache[endIndex%capacity], 并且endIndex加1
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(event *watchCacheEvent) {
w.resizeCacheLocked(event.RecordTime)
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
}
// resizeCacheLocked resizes the cache if necessary:
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
capacity := min(w.capacity*2, w.upperBoundCapacity)
if capacity > w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
capacity := max(w.capacity/2, w.lowerBoundCapacity)
if capacity < w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
}
Watch
etcd默认保留5分钟以内的变更记录,每个资源发生变更都会更新一个更大的资 源版本ResourceVersion,ResourceVersion是一个所有资源类型共享的全局变量。
向etcd watch资源时:
- 如果不指定resourceversion, 则从最新的开始watch
- 如果指定了resourceversion
- 如果resourceversion小于etcd保留的最小版本, 返回410(Gone)
- 如果大于etcd最新版本, 则等待, 直到等到或者超时
了解基础概念后, 接下来看Watch的代码
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
// ...
// 等待cache加载完成
c.ready.wait()
// ...
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline()
// Create a watcher here to reduce memory allocations under lock,
// given that memory allocation may trigger GC and block the thread.
// Also note that emptyFunc is a placeholder, until we will be able
// to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
// GetAllEventsSinceThreadUnsafe会将缓存内的所有数据(指定的ResourceVersion之后的数据)以Add事件的形式聚合
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
// ...
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
// initEvents[len(initEvents)-1]是缓存中最新的数据
if len(initEvents) > 0 {
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}
func() {
c.Lock()
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
go watcher.process(ctx, initEvents, watchRV)
return watcher, nil
}
Watch接口会返回一个watch.Interface, 其中有一个ResultChan的方法, 调用watch的一方可以从这个ResultChan获取事件
这里使用的是cacheWatcher做为watch.Interface的具体实现.
watch的具体过程(k8s.io/apiserver/pkg/storage/cacher/cacher.go)
- Cacher创建了一个WatchCache对象, 并传入自己的processEvent函数做为handler
- WatchCache对象会去真正的watch etcd, watch到事件后调用handler(也就是Cacher的processEvent)
- processEvent将事件放到Cacher的incoming channel
- Cacher初始化时还启动了一个goroutine不断从incoming channel读事件
- Cacher另外维护了一个watcher的队列, 每当有一个消费者调用Cacher的Watch方法时就创建一个watcher放到Cacher的watcher队列
- Cacher从incoming channel读到的事件后调用每个watcher的add/nonblockingAdd, 这里还比较细节, 总之就是放到watcher内部的input channel
- Cacher创建watcher时会调用它的process携程, process方法从input channle读数据, 如果读到的数据的resource version大于用户指定的 ResourceVersion, 则将数据放到ResultCahn中
- Watch的调用方从ResultChan读取事件
List
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
return c.storage.List(ctx, key, opts, listObj)
}
// ...
// 缓存没有ready
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, opts, listObj)
}
// ...
// 获取数据
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
if err != nil {
return err
}
// ...
// 过滤数据, 可以看到是先从缓存获取全部数据, 在一个个过滤
for _, obj := range objs {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
}
List时不一定就直接从缓存里获取数据
- 参数问题: 如果没有指定resourceVersion, 或者指定了非零的
resourceversion同时指定了limit, 或者指定了continue, 将会导致直接从
etcd获取数据.
这里resourceVersion的语义:
- 不设置: 取最新版本
- 设置0: 取任意版本(缓存里有什么就给什么)
- 非零: 比给定版本新的任意版本
- 设置了resourceVersion=0, 但是缓存还没ready, 将会直接从etcd拉取o
WaitUntilFreshAndList
- 获取数据时, 检查请求的resourceversion与cache内的resourceversion, 如 果大于cache内的, 那需要等待直到cache内缓存到指定版本的数据为止.