Kubernetes1.5 source code analysis (four) apiServer resources etcd interface implementation

Source version

Kubernetes v1.5.0


K8s various components and apiServer interactive operation of various resource objects, and ultimately will fall into the etcd.
K8s for all external services to provide Restful resources to achieve a common meet the Restable requirements of the etcd operating interface, each service interface is responsible for dealing with a Kind (Kind) resource object.
These resource objects include pods, bindings, podTemplates, RC, services, and so on.

Storage created

To understand the realization of the operation interface, we need to understand the Master.GenericAPIServer.storage structure:

  Storage map [string] rest.Storage 

The storage variable is a map, Key is REST API path, Value is rest.Storage interface, the interface is a common meet the Restful requirements of the resource storage interface.
Create a core group resource list To view the NewLegacyRESTStorage () interface in pkg / registry / core / rest / storage_core.go:
Interface call flow: main -> App.Run -> config.Complete (). New () -> m.InstallLegacyAPI () -> NewLegacyRESTStorage ()

  Func (c LegacyRESTStorageProvider) NewLegacyRESTStorage (restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { 
// create podStorage
PodStorage: = podetcd.NewStorage (
RestOptionsGetter (api.Resource ("pods")),

// resource list
RestStorageMap: = map [string] rest.Storage {
"Pods": podStorage.Pod,
"Pods / attach": podStorage.Attach,
"Pods / status": podStorage.Status,
"Pods / log": podStorage.Log,
"Pods / exec": podStorage.Exec,
"Pods / portforward": podStorage.PortForward,
"Pods / proxy": podStorage.Proxy,
"Pods / binding": podStorage.Binding,
"Bindings": podStorage.Binding,

"PodTemplates": podTemplateStorage,

"ReplicationControllers": controllerStorage.Controller,
"ReplicationControllers / status": controllerStorage.Status,

"Services": serviceRest.Service,
"Services / proxy": serviceRest.Proxy,
"Services / status": serviceStatusStorage,

"Endpoints": endpointsStorage,


"ComponentStatuses": componentstatus.NewStorage (componentStatusStorage {c.StorageFactory} .serversToValidate),
If registered.IsEnabledVersion (unversioned.GroupVersion {Group: "autoscaling", Version: "v1"}) {
RestStorageMap ["replicationControllers / scale"] = controllerStorage.Scale
If registered.IsEnabledVersion (unversioned.GroupVersion {Group: "policy", Version: "v1beta1"}) {
RestStorageMap ["pods / eviction"] = podStorage.Eviction
ApiGroupInfo.VersionedResourcesStorageMap ["v1"] = restStorageMap

Return restStorage, apiGroupInfo, nil

The interface in the ApiServer source code analysis of the second chapter describes the time of resource registration has been said, here we mainly analyze the back-end storage etcd operation interface implementation.
We take Pod resources as an example, to introduce:
Path: pkg / registry / core / pod / etcd / etcd.go

  Func NewStorage (opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage { 
// complete the prefix
Prefix: = "/" + opts.ResourcePrefix

NewListFunc: = func () runtime.Object {return & api.PodList {}}
// call the interface decorator to return to the storage's etcd interface and the resource delete interface
// The opts pass in, need to go to the next level to see master.go under the restOptionsFactory.NewFor
StorageInterface, dFunc: = opts.Decorator (
/ / This is the next parameter is used to open the cache when using the interface
Cachesize.GetWatchCacheSizeByResource (cachesize.Pods),
& Api.Pod {},

Store: = & registry.Store {
NewFunc: func () runtime.Object {return & api.Pod {}},
NewListFunc: newListFunc,
KeyRootFunc: func (ctx api.Context) string {
Return registry.NamespaceKeyRootFunc (ctx, prefix)
KeyFunc: func (ctx api.Context, name string) (string, error) {
Return registry.NamespaceKeyFunc (ctx, prefix, name)
ObjectNameFunc: func (obj runtime.Object) (string, error) {
Return obj. (* Api.Pod) .Name, nil
PredicateFunc: pod.MatchPod,
QualifiedResource: api.Resource ("pods"),
EnableGarbageCollection: opts.EnableGarbageCollection,
DeleteCollectionWorkers: opts.DeleteCollectionWorkers,

CreateStrategy: pod.Strategy,
UpdateStrategy: pod.Strategy,
DeleteStrategy: pod.Strategy,
ReturnDeletedObject: true,

Storage: storageInterface,
DestroyFunc: dFunc,

StatusStore: = * store
StatusStore.UpdateStrategy = pod.StatusStrategy

Return PodStorage {
Pod: & REST {store, proxyTransport},
Binding: & BindingREST {store: store},
Eviction: newEvictionStorage (store, podDisruptionBudgetClient),
Status: & StatusREST {store: & statusStore},
Log: & podrest.LogREST {Store: store, KubeletConn: k},
Proxy: & podrest.ProxyREST {Store: store, ProxyTransport: proxyTransport},
Exec: & podrest.ExecREST {Store: store, KubeletConn: k},
Attach: & podrest.AttachREST {Store: store, KubeletConn: k},
PortForward: & podrest.PortForwardREST {Store: store, KubeletConn: k},

The interface that calls the opts.Decorator () interface returns the key storage interface and the interface for clearing the operating resources.
To see the implementation of the interface, we have to start from the creation of opts.
Apo.Resource ("pods")) This step is the creation of opts, api.Resource ("pods") is actually splicing a GroupResource structure, we need to start from the beginning of the restOptionsGetter interface origin.
Path: pkg / master / master.go

  Func (c completedConfig) New () (* Master, error) { 
RestOptionsFactory: = restOptionsFactory {
DeleteCollectionWorkers: c.DeleteCollectionWorkers,
EnableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
StorageFactory: c.StorageFactory,

// Determine whether Cache for Watch is enabled
/ / Whether the cache assignment is a different interface to achieve
// restOptionsFactory.storageDecorator: is a REST interface (CRUD) decorator for each resource
/ / Later call NewStorage () will use the interface, and output the corresponding CRUD interface and destruction interface.
/ / Can refer to pkg / registry / core / pod / etcd / etcd.go in the NewStorage ()
/ / In fact, there is no difference between the cache interface is: there is a cache, then provide the interface to operate the cache; no cache, then provide direct operation etcd interface
If c.EnableWatchCache {
RestOptionsFactory.storageDecorator = registry.StorageWithCacher
} Else {
RestOptionsFactory.storageDecorator = generic.UndecoratedStorage
// install legacy rest storage
If c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled (apiv1.SchemeGroupVersion) {
LegacyRESTStorageProvider: = corerest.LegacyRESTStorageProvider {
StorageFactory: c.StorageFactory,
ProxyTransport: c.ProxyTransport,
KubeletClientConfig: c.KubeletClientConfig,
EventTTL: c.EventTTL,
ServiceIPRange: c.ServiceIPRange,
ServiceNodePortRange: c.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
M.InstallLegacyAPI (c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)

The interface initializes a restOptionsFactory variable, which specifies the maximum number of executions of the recovered resource, whether GC and storageFactory are enabled, and the assignment of the decorator interface called in the NewStorage () interface, depending on whether WatchCache is enabled.
RestOptionsFactory.NewForj interface has been passed down until the NewLegacyRESTStorage () interface is called and then created the opts, we look at the interface implementation:
Path: pkg / master / master.go

  Type restOptionsFactory struct { 
DeleteCollectionWorkers int
EnableGarbageCollection bool
StorageFactory genericapiserver.StorageFactory
StorageDecorator generic.StorageDecorator

Func (f restOptionsFactory) NewFor (resource unversioned.GroupResource) generic.RESTOptions {
// Create Storage Config for the resource
StorageConfig, err: = f.storageFactory.NewConfig (resource)
If err! = Nil {
Glog.Fatalf ("Unable to find storage destination for% v, due to% v", resource, err.Error ())
// The final return is RESTOptions, which is the type of opts in front
// need to focus on the origin of f.storageDecorator
Return generic.RESTOptions {
// The config used to generate the Storage
StorageConfig: storageConfig,
Decorator: f.storageDecorator,
DeleteCollectionWorkers: f.deleteCollectionWorkers,
EnableGarbageCollection: f.enableGarbageCollection,
ResourcePrefix: f.storageFactory.ResourcePrefix (resource),

The interface is relatively simple, initialize a generic.RESTOptions variable, that is, opts. We need to find the origin of opts.Decorator, you only need to look at the next interface to determine EnableWatchCache understand.
Opts.Decorator The interface eventually returns the interface of storage and the interface that clears the operation resource. You can think about with the buffer and no buffer interface to achieve certainly inconsistent, so here need to distinguish:
– registry.StorageWithCacher: the interface is to return to the operation of the cache interface, and clear the cache operation interface
– generic.UndecoratedStorage: the interface will be based on your configuration of the backend type (etcd2 / etcd3, etc.) to return to the different etcd operating interface, in fact, for all resource objects created a link of etcd, and then through the link to send a different Command, and finally returned to disconnect the link interface.
So to achieve completely different, an operation cache, an operation of the actual etcd.

Look at registry.StorageWithCacher () interface to achieve:
Path: pkg / registry / generic / registry / storage_factory.go

  Func StorageWithCacher ( 
StorageConfig * storagebackend.Config,
Capacity int,
ObjectType runtime.Object,
ResourcePrefix string,
ScopeStrategy rest.NamespaceScopedStrategy,
NewListFunc func () runtime.Object,
TriggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {

// storageConfig is the back-end storage config, defines the storage type, storage server List, TLS certificate information, Cache size and so on.
/ / This interface is generic.UndecoratedStorage () interface implementation, StorageWithCacher () interface is more than the following cacher operation
S, d: = generic.NewRawStorage (storageConfig)
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface - cacher and low level kv.
CacherConfig: = storage.CacherConfig {
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner {},
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
// According to whether there is a namespace to distinguish the assignment
// KeyFunc function is used to get the Key of the object:
// namespace, key format: prefix + "/" + Namespace + "/" + name
// no namespace, key format: prefix + "/" + name
If scopeStrategy.NamespaceScoped () {
CacherConfig.KeyFunc = func (obj runtime.Object) (string, error) {
Return storage.NamespaceKeyFunc (resourcePrefix, obj)
} Else {
CacherConfig.KeyFunc = func (obj runtime.Object) (string, error) {
Return storage.NoNamespaceKeyFunc (resourcePrefix, obj)
// cacher is created based on the previously initialized Cacher's config
// more critical, later introduced
Cacher: = storage.NewCacherFromConfig (cacherConfig)
DestroyFunc: = func () {
Cacher.Stop ()
D ()

Return cacher, destroyFunc

First call NewRawStorage () interface to create a storage back-end, we first look at this interface to achieve:
Path: pkg / registry / generic / storage_decorator.go

  Func NewRawStorage (config * storagebackend.Config) (storage.Interface, factory.DestroyFunc) { 
S, d, err: = factory.Create (* config)
If err! = Nil {
Glog.Fatalf ("Unable to create storage backend: config (% v), err (% v)", config, err)
Return s, d

Nothing to say, continue to see Create ():
Path: pkg / storage / storagebackend / factory / factory.go

  Func Create (c storagebackend.Config) (storage.Interface, DestroyFunc, error) { 
// judge the storage type: etcd2, etcd3
Switch c.Type {
Case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
Return newETCD2Storage (c)
Case storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// support non-quorum read.
Return newETCD3Storage (c)
Return nil, nil, fmt.Errorf ("unknown storage type:% s", c.Type)

Pick a etcd2 look down to achieve:
Path: pkg / storage / storagebackend / factory / etcd2.go

  Func newETCD2Storage (c storagebackend.Config) (storage.Interface, DestroyFunc, error) { 
// Create http.Transport based on the configured TLS certificate information
Tr, err: = newTransportForETCD2 (c.CertFile, c.KeyFile, c.CAFile)
If err! = Nil {
Return nil, nil, err
// create etcd2 client, return the httpClusterClient structure
Client, err: = newETCD2Client (tr, c.ServerList)
If err! = Nil {
Return nil, nil, err
// Initialize an etcdHelper variable that implements the storage.Interface interface
S: = etcd.NewEtcdStorage (client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize)
// return the etcdHelper variable, and close the linked function
Return s, tr.CloseIdleConnections, nil

The first two steps are to create a client connected with etcd, the latter step is more critical:
Path: pkg / storage / etcd / etcd_helper.go

 Func NewEtcdStorage (client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { 
Return & etcdHelper {
// Create an httpMembersAPI variable with a lot of methods
EtcdMembersAPI: etcd.NewMembersAPI (client),
// Create an httpKeysAPI variable, along with various methods
EtcdKeysAPI: etcd.NewKeysAPI (client),
// codec use
Codec: codec,
Versioner: APIObjectVersioner {},
/ / For serialization deserialization, version conversion, compatibility and so on
Copier: api.Scheme,
PathPrefix: path.Join ("/", prefix),
Quorum: quorum,
// Create a cache structure
Cache: utilcache.NewCache (cacheSize),

The interface is very simple to initialize, need to be concerned with the common RESTFul method that is included with etcdHelper:

You can see the storage.Interface interface required methods are achieved.

Continue to return to the StorageWithCacher () interface, go down is CacherConfig initialization, it is not introduced directly into the cacher to create the interface:
Path: pkg / storage / cacher.go

  Func NewCacherFromConfig (config CacherConfig) * Cacher { 
WatchCache: = newWatchCache (config.CacheCapacity, config.KeyFunc)
ListerWatcher: = newCacherListerWatcher (config.Storage, config.ResourcePrefix, config.NewListFunc)

// Give this error when it is constructed than than when you get the
// first watch item, because it's much easier to track down that way.
If obj, ok: = config.Type. (Runtime.Object); ok {
If err: = runtime.CheckCodec (config.Codec, obj); err! = Nil {
Panic ("storage codec does not seem to match given type:" + err.Error ())

Cacher: = & Cacher {
Ready: newReady (),
Storage: config.Storage,
ObjectType: reflect.TypeOf (config.Type),
WatchCache: watchCache,
Reflector: cache.NewReflector (listerWatcher, config.Type, watchCache, 0),
Versioner: config.Versioner,
TriggerFunc: config.TriggerPublisherFunc,
WatcherIdx: 0,
Watchers: indexedWatchers {
AllWatchers: make (map [int] * cacheWatcher),
ValueWatchers: make (map [string] watchersMap),
// TODO: Figure out the correct value for the buffer size.
Incoming: make (chan watchCacheEvent, 100),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
// and there are no accomplits on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
StopCh: make (chan struct {}),
WatchCache.SetOnEvent (cacher.processEvent)
Go cacher.dispatchEvents ()

StopCh: = cacher.stopCh
Cacher.stopWg.Add (1)
Go func () {
Defer cacher.stopWg.Done ()
Wait.Until (
Func () {
If! Cacher.isStopped () {
Cacher.startCaching (stopCh)
}, Time.Second, stopCh,
} ()
Return cacher

The interface is mainly used to open cacher, and the cache is only used for WATCH and LIST request.
We look at the Cacher structure:

The interface must also implement the storage.Interface interface required method.
Because the Cacher only for WATCH and LIST request, so you can look at the cacher provided API, in addition to WATCH and LIST related to the interface is called before the creation of the storage API.
View the next cacher.Create and Delete:

  Func (c * Cacher) Create (ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { 
Return c.storage.Create (ctx, key, obj, out, ttl)

Func (c * Cacher) Delete (ctx context.Context, key string, out runtime.Object, preconditions * Preconditions) error {
Return c.storage.Delete (ctx, key, out, preconditions)

To the registry.StorageWithCacher () interface is over, we continue to return to the front of another interface generic.UndecoratedStorage ():
Path: pkg / registry / generic / storage_decorator.go

  Func UndecoratedStorage ( 
Config * storagebackend.Config,
Capacity int,
ObjectType runtime.Object,
ResourcePrefix string,
ScopeStrategy rest.NamespaceScopedStrategy,
NewListFunc func () runtime.Object,
Trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
Return NewRawStorage (config)

Found registry.StorageWithCacher () interface is also called the NewRawStorage () interface, the implementation of the less cache.

Here access to the cache, the next section will be devoted to the implementation of the cache.

User configuration

1. –watch-cache: The apiServer parameter is true by default, used to open the watch cache
2. –watch-cache-sizes: Since there are enable cache, then the cache sizes, and the size can be used to specify the various types of resources used cache size. Format: resource # size
3. –storage-backend: back-end persistent storage type, optional for etcd2 (default), etcd3

Heads up! This alert needs your attention, but it's not super important.