一文读懂 Kubernetes APIServer 原理
前言
整个Kubernetes技术体系由声明式API以及Controller构成,而kube-apiserver是Kubernetes的声明式api server,并为其它组件交互提供了桥梁。因此加深对kube-apiserver的理解就显得至关重要了。
整体组件功能
kube-apiserver作为整个Kubernetes集群操作etcd的唯一入口,负责Kubernetes各资源的认证&鉴权,校验以及CRUD等操作,提供RESTful APIs,供其它组件调用:
kube-apiserver包含三种APIServer:
- aggregatorServer:负责处理
apiregistration.k8s.io
组下的APIService资源请求,同时将来自用户的请求拦截转发给aggregated server(AA) - kubeAPIServer:负责对请求的一些通用处理,包括:认证、鉴权以及各个内建资源(pod, deployment,service and etc)的REST服务等
- apiExtensionsServer:负责CustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相应CustomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环
另外还包括bootstrap-controller,主要负责Kubernetes default apiserver service的创建以及管理。
接下来将对上述组件进行概览性总结。
bootstrap-controller
- apiserver bootstrap-controller创建&运行逻辑在k8s.io/kubernetes/pkg/master目录
- bootstrap-controller主要用于创建以及维护内部kubernetes default apiserver service
- kubernetes default apiserver service spec.selector为空,这是default apiserver service与其它正常service的最大区别,表明了这个特殊的service对应的endpoints不由endpoints controller控制,而是直接受kube-apiserver bootstrap-controller管理(maintained by this code, not by the pod selector)
- bootstrap-controller的几个主要功能如下:
- 创建 default、kube-system 和 kube-public 以及 kube-node-lease 命名空间
- 创建&维护kubernetes default apiserver service以及对应的endpoint
- 提供基于Service ClusterIP的检查及修复功能(
--service-cluster-ip-range
指定范围) - 提供基于Service NodePort的检查及修复功能(
--service-node-port-range
指定范围)
// k8s.io/kubernetes/pkg/master/controller.go:142// Start begins the core controller loops that must exist for bootstrapping// a cluster.func (c *Controller) Start() { if c.runner != nil { return } // Reconcile during first run removing itself until server is ready. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err) } repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) // run all of the controllers once prior to returning from Start. if err := repairClusterIPs.RunOnce(); err != nil { // If we fail to repair cluster IPs apiserver is useless. We should restart and retry. klog.Fatalf("Unable to perform initial IP allocation check: %v", err) } if err := repairNodePorts.RunOnce(); err != nil { // If we fail to repair node ports apiserver is useless. We should restart and retry. klog.Fatalf("Unable to perform initial service nodePort check: %v", err) } // 定期执行bootstrap controller主要的四个功能(reconciliation) c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil) c.runner.Start()}
更多代码原理详情,参考 kubernetes-reading-notes 。
kubeAPIServer
KubeAPIServer主要提供对内建API Resources的操作请求,为Kubernetes中各API Resources注册路由信息,同时暴露RESTful API,使集群中以及集群外的服务都可以通过RESTful API操作Kubernetes中的资源
另外,kubeAPIServer是整个Kubernetes apiserver的核心,下面将要讲述的aggregatorServer以及apiExtensionsServer都是建立在kubeAPIServer基础上进行扩展的(补充了Kubernetes对用户自定义资源的能力支持)
kubeAPIServer最核心的功能是为Kubernetes内置资源添加路由,如下:
- 调用
m.InstallLegacyAPI
将核心 API Resources添加到路由中,在apiserver中即是以/api
开头的 resource; - 调用
m.InstallAPIs
将扩展的 API Resources添加到路由中,在apiserver中即是以/apis
开头的 resource;
// k8s.io/kubernetes/pkg/master/master.go:332// New returns a new instance of Master from the given config.// Certain config fields will be set to a default value if unset.// Certain config fields must be specified, including:// KubeletClientConfigfunc (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) { ... // 安装 LegacyAPI(core API) // install legacy rest storage if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.ExtraConfig.StorageFactory, ProxyTransport: c.ExtraConfig.ProxyTransport, KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, EventTTL: c.ExtraConfig.EventTTL, ServiceIPRange: c.ExtraConfig.ServiceIPRange, SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange, ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer, ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, APIAudiences: c.GenericConfig.Authentication.APIAudiences, } if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil { return nil, err } } ... // 安装 APIs(named groups apis) if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil { return nil, err } ... return m, nil}
整个kubeAPIServer提供了三类API Resource接口:
- core group:主要在
/api/v1
下; - named groups:其 path 为
/apis/$GROUP/$VERSION
; - 系统状态的一些 API:如
/metrics
、/version
等;
而API的URL大致以 /apis/{group}/{version}/namespaces/{namespace}/resource/{name}
组成,结构如下图所示:
kubeAPIServer会为每种API资源创建对应的RESTStorage,RESTStorage的目的是将每种资源的访问路径及其后端存储的操作对应起来:通过构造的REST Storage实现的接口判断该资源可以执行哪些操作(如:create、update等),将其对应的操作存入到action中,每一个操作对应一个标准的REST method,如create对应REST method为POST,而update对应REST method为PUT。最终根据actions数组依次遍历,对每一个操作添加一个handler(handler对应REST Storage实现的相关接口),并注册到route,最终对外提供RESTful API,如下:
// m.GenericAPIServer.InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers// k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) { ... // 1、判断该 resource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由 // what verbs are supported by the storage, used to know what verbs we support per path creater, isCreater := storage.(rest.Creater) namedCreater, isNamedCreater := storage.(rest.NamedCreater) lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) ... // 2、为 resource 添加对应的 actions(+根据是否支持 namespace) // Get the list of actions for the given scope. switch { case !namespaceScoped: // Handle non-namespace scoped resources like nodes. resourcePath := resource resourceParams := params itemPath := resourcePath + "/{name}" nameParams := append(params, nameParam) proxyParams := append(nameParams, pathParam) ... // Handler for standard REST verbs (GET, PUT, POST and DELETE). // Add actions at the resource path: /api/apiVersion/resource actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister) actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) ... } ... // 3、从 rest.Storage 到 restful.Route 映射 // 为每个操作添加对应的 handler for _, action := range actions { ... switch action.Verb { ... case "POST": // Create a resource. var handler restful.RouteFunction // 4、初始化 handler if isNamedCreater { handler = restfulCreateNamedResource(namedCreater, reqScope, admit) } else { handler = restfulCreateResource(creater, reqScope, admit) } handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler) ... // 5、route 与 handler 进行绑定 route := ws.POST(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...). Returns(http.StatusOK, "OK", producedObject). // TODO: in some cases, the API may return a v1.Status instead of the versioned object // but currently go-restful can't handle multiple different objects being returned. Returns(http.StatusCreated, "Created", producedObject). Returns(http.StatusAccepted, "Accepted", producedObject). Reads(defaultVersionedObject). Writes(producedObject) if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil { return nil, err } addParams(route, action.Params) // 6、添加到路由中 routes = append(routes, route) case "DELETE": // Delete a resource. ... default: return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb) } for _, route := range routes { route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{ Group: reqScope.Kind.Group, Version: reqScope.Kind.Version, Kind: reqScope.Kind.Kind, }) route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb)) ws.Route(route) } // Note: update GetAuthorizerAttributes() when adding a custom handler. } ...}
kubeAPIServer代码结构整理如下:
1. apiserver整体启动逻辑 k8s.io/kubernetes/cmd/kube-apiserver2. apiserver bootstrap-controller创建&运行逻辑 k8s.io/kubernetes/pkg/master3. API Resource对应后端RESTStorage(based on genericregistry.Store)创建k8s.io/kubernetes/pkg/registry4. aggregated-apiserver创建&处理逻辑 k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator5. extensions-apiserver创建&处理逻辑 k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver6. apiserver创建&运行 k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server7. 注册API Resource资源处理handler(InstallREST&Install®isterResourceHandlers) k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints8. 创建存储后端(etcdv3) k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage9. genericregistry.Store.CompleteWithOptions初始化 k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry
调用链整理如下:
更多代码原理详情,参考 kubernetes-reading-notes 。
aggregatorServer
aggregatorServer主要用于处理扩展Kubernetes API Resources的第二种方式Aggregated APIServer(AA),将CR请求代理给AA:
这里结合Kubernetes官方给出的aggregated apiserver例子sample-apiserver,总结原理如下:
aggregatorServer通过APIServices对象关联到某个Service来进行请求的转发,其关联的Service类型进一步决定了请求转发的形式。aggregatorServer包括一个
GenericAPIServer
和维护自身状态的Controller
。其中GenericAPIServer
主要处理apiregistration.k8s.io
组下的APIService资源请求,而Controller包括:apiserviceRegistrationController
:负责根据APIService定义的aggregated server service构建代理,将CR的请求转发给后端的aggregated serveravailableConditionController
:维护 APIServices 的可用状态,包括其引用 Service 是否可用等;autoRegistrationController
:用于保持 API 中存在的一组特定的 APIServices;crdRegistrationController
:负责将 CRD GroupVersions 自动注册到 APIServices 中;openAPIAggregationController
:将 APIServices 资源的变化同步至提供的 OpenAPI 文档;
apiserviceRegistrationController负责根据APIService定义的aggregated server service构建代理,将CR的请求转发给后端的aggregated server。apiService有两种类型:Local(Service为空)以及Service(Service非空)。apiserviceRegistrationController负责对这两种类型apiService设置代理:Local类型会直接路由给kube-apiserver进行处理;而Service类型则会设置代理并将请求转化为对aggregated Service的请求(proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version),而请求的负载均衡策略则是优先本地访问kube-apiserver(如果service为kubernetes default apiserver service:443)=>通过service ClusterIP:Port访问(默认) 或者 通过随机选择service endpoint backend进行访问:
func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {... proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version // v1. is a special case for the legacy API. It proxies to a wider set of endpoints. if apiService.Name == legacyAPIServiceName { proxyPath = "/api" } // register the proxy handler proxyHandler := &proxyHandler{ localDelegate: s.delegateHandler, proxyClientCert: s.proxyClientCert, proxyClientKey: s.proxyClientKey, proxyTransport: s.proxyTransport, serviceResolver: s.serviceResolver, egressSelector: s.egressSelector, }... s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)... // it's time to register the group aggregation endpoint groupPath := "/apis/" + apiService.Spec.Group groupDiscoveryHandler := &apiGroupHandler{ codecs: aggregatorscheme.Codecs, groupName: apiService.Spec.Group, lister: s.lister, delegate: s.delegateHandler, } // aggregation is protected s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler) s.handledGroups.Insert(apiService.Spec.Group) return nil}// k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go:109func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // 加载roxyHandlingInfo处理请求 value := r.handlingInfo.Load() if value == nil { r.localDelegate.ServeHTTP(w, req) return } handlingInfo := value.(proxyHandlingInfo)... // 判断APIService服务是否正常 if !handlingInfo.serviceAvailable { proxyError(w, req, "service unavailable", http.StatusServiceUnavailable) return } // 将原始请求转化为对APIService的请求 // write a new location based on the existing request pointed at the target service location := &url.URL{} location.Scheme = "https" rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort) if err != nil { klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err) proxyError(w, req, "service unavailable", http.StatusServiceUnavailable) return } location.Host = rloc.Host location.Path = req.URL.Path location.RawQuery = req.URL.Query().Encode() newReq, cancelFn := newRequestForProxy(location, req) defer cancelFn() ... proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper) handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w}) handler.ServeHTTP(w, newReq)}
$ kubectl get APIService NAME SERVICE AVAILABLE AGE...v1.apps Local True 50d...v1beta1.metrics.k8s.io kube-system/metrics-server True 50d...
# default APIServices$ kubectl get -o yaml APIService/v1.appsapiVersion: apiregistration.k8s.io/v1kind: APIServicemetadata:labels: kube-aggregator.kubernetes.io/automanaged: onstartname: v1.appsselfLink: /apis/apiregistration.k8s.io/v1/apiservices/v1.appsspec:group: appsgroupPriorityMinimum: 17800version: v1versionPriority: 15status:conditions:- lastTransitionTime: "2020-10-20T10:39:48Z" message: Local APIServices are always available reason: Local status: "True" type: Available# aggregated server $ kubectl get -o yaml APIService/v1beta1.metrics.k8s.ioapiVersion: apiregistration.k8s.io/v1kind: APIServicemetadata:labels: addonmanager.kubernetes.io/mode: Reconcile kubernetes.io/cluster-service: "true"name: v1beta1.metrics.k8s.ioselfLink: /apis/apiregistration.k8s.io/v1/apiservices/v1beta1.metrics.k8s.iospec:group: metrics.k8s.iogroupPriorityMinimum: 100insecureSkipTLSVerify: trueservice: name: metrics-server namespace: kube-system port: 443version: v1beta1versionPriority: 100status:conditions:- lastTransitionTime: "2020-12-05T00:50:48Z" message: all checks passed reason: Passed status: "True" type: Available# CRD$ kubectl get -o yaml APIService/v1.duyanghao.example.comapiVersion: apiregistration.k8s.io/v1kind: APIServicemetadata:labels: kube-aggregator.kubernetes.io/automanaged: "true"name: v1.duyanghao.example.comselfLink: /apis/apiregistration.k8s.io/v1/apiservices/v1.duyanghao.example.comspec:group: duyanghao.example.comgroupPriorityMinimum: 1000version: v1versionPriority: 100status:conditions:- lastTransitionTime: "2020-12-11T08:45:37Z" message: Local APIServices are always available reason: Local status: "True" type: Available
aggregatorServer创建过程中会根据所有kube-apiserver定义的API资源创建默认的APIService列表,名称即是
$VERSION.$GROUP
,这些APIService都会有标签kube-aggregator.kubernetes.io/automanaged: onstart
,例如:v1.apps apiService。autoRegistrationController创建并维护这些列表中的APIService,也即我们看到的Local apiService;对于自定义的APIService(aggregated server),则不会对其进行处理aggregated server实现CR(自定义API资源) 的CRUD API接口,并可以灵活选择后端存储,可以与core kube-apiserver一起公用etcd,也可自己独立部署etcd数据库或者其它数据库。aggregated server实现的CR API路径为:/apis/$GROUP/$VERSION,具体到sample apiserver为:/apis/wardle.example.com/v1alpha1,下面的资源类型有:flunders以及fischers
aggregated server通过部署APIService类型资源,service fields指向对应的aggregated server service实现与core kube-apiserver的集成与交互
sample-apiserver目录结构如下,可参考编写自己的aggregated server:
staging/src/k8s.io/sample-apiserver├── artifacts│ ├── example│ │ ├── apiservice.yaml ...├── hack├── main.go└── pkg├── admission├── apis├── apiserver├── cmd├── generated│ ├── clientset│ │ └── versioned ...│ │ └── typed│ │ └── wardle│ │ ├── v1alpha1│ │ └── v1beta1│ ├── informers│ │ └── externalversions│ │ └── wardle│ │ ├── v1alpha1│ │ └── v1beta1│ ├── listers│ │ └── wardle│ │ ├── v1alpha1│ │ └── v1beta1└── registry
- 其中,artifacts用于部署yaml示例
- hack目录存放自动脚本(eg: update-codegen)
- main.go是aggregated server启动入口;pkg/cmd负责启动aggregated server具体逻辑;pkg/apiserver用于aggregated server初始化以及路由注册
- pkg/apis负责相关CR的结构体定义,自动生成(update-codegen)
- pkg/admission负责准入的相关代码
- pkg/generated负责生成访问CR的clientset,informers,以及listers
- pkg/registry目录负责CR相关的RESTStorage实现
更多代码原理详情,参考 kubernetes-reading-notes 。
apiExtensionsServer
apiExtensionsServer主要负责CustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相应CustomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环
原理总结如下:
Custom Resource,简称CR,是Kubernetes自定义资源类型,与之相对应的就是Kubernetes内置的各种资源类型,例如Pod、Service等。利用CR我们可以定义任何想要的资源类型
CRD通过yaml文件的形式向Kubernetes注册CR实现自定义api-resources,属于第二种扩展Kubernetes API资源的方式,也是普遍使用的一种
APIExtensionServer负责CustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相应CustomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环
crdRegistrationController
负责将CRD GroupVersions自动注册到APIServices中。具体逻辑为:枚举所有CRDs,然后根据CRD定义的crd.Spec.Group以及crd.Spec.Versions字段构建APIService,并添加到autoRegisterController.apiServicesToSync中,由autoRegisterController进行创建以及维护操作。这也是为什么创建完CRD后会产生对应的APIService对象APIExtensionServer包含的controller以及功能如下所示:
openapiController
:将 crd 资源的变化同步至提供的 OpenAPI 文档,可通过访问/openapi/v2
进行查看;crdController
:负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者的信息可通过kubectl api-versions
和kubectl api-resources
查看;kubectl api-versions
命令返回所有Kubernetes集群资源的版本信息(实际发出了两个请求,分别是https://127.0.0.1:6443/api
以及https://127.0.0.1:6443/apis
,并在最后将两个请求的返回结果进行了合并)
$ kubectl -v=8 api-versions I1211 11:44:50.276446 22493 loader.go:375] Config loaded from file: /root/.kube/configI1211 11:44:50.277005 22493 round_trippers.go:420] GET https://127.0.0.1:6443/api?timeout=32s...I1211 11:44:50.290265 22493 request.go:1068] Response Body: {"kind":"APIVersions","versions":["v1"],"serverAddressByClientCIDRs":[{"clientCIDR":"0.0.0.0/0","serverAddress":"x.x.x.x:6443"}]}I1211 11:44:50.293673 22493 round_trippers.go:420] GET https://127.0.0.1:6443/apis?timeout=32s...I1211 11:44:50.298360 22493 request.go:1068] Response Body: {"kind":"APIGroupList","apiVersion":"v1","groups":[{"name":"apiregistration.k8s.io","versions":[{"groupVersion":"apiregistration.k8s.io/v1","version":"v1"},{"groupVersion":"apiregistration.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"apiregistration.k8s.io/v1","version":"v1"}},{"name":"extensions","versions":[{"groupVersion":"extensions/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"extensions/v1beta1","version":"v1beta1"}},{"name":"apps","versions":[{"groupVersion":"apps/v1","version":"v1"}],"preferredVersion":{"groupVersion":"apps/v1","version":"v1"}},{"name":"events.k8s.io","versions":[{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}},{"name":"authentication.k8s.io","versions":[{"groupVersion":"authentication.k8s.io/v1","version":"v1"},{"groupVersion":"authentication.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"authentication.k8s.io/v1"," [truncated 4985 chars]apiextensions.k8s.io/v1apiextensions.k8s.io/v1beta1apiregistration.k8s.io/v1apiregistration.k8s.io/v1beta1apps/v1authentication.k8s.io/v1beta1...storage.k8s.io/v1storage.k8s.io/v1beta1v1
kubectl api-resources
命令就是先获取所有API版本信息,然后对每一个API版本调用接口获取该版本下的所有API资源类型$ kubectl -v=8 api-resources5077 loader.go:375] Config loaded from file: /root/.kube/configI1211 15:19:47.593450 15077 round_trippers.go:420] GET https://127.0.0.1:6443/api?timeout=32sI1211 15:19:47.602273 15077 request.go:1068] Response Body: {"kind":"APIVersions","versions":["v1"],"serverAddressByClientCIDRs":[{"clientCIDR":"0.0.0.0/0","serverAddress":"x.x.x.x:6443"}]}I1211 15:19:47.606279 15077 round_trippers.go:420] GET https://127.0.0.1:6443/apis?timeout=32sI1211 15:19:47.610333 15077 request.go:1068] Response Body: {"kind":"APIGroupList","apiVersion":"v1","groups":[{"name":"apiregistration.k8s.io","versions":[{"groupVersion":"apiregistration.k8s.io/v1","version":"v1"},{"groupVersion":"apiregistration.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"apiregistration.k8s.io/v1","version":"v1"}},{"name":"extensions","versions":[{"groupVersion":"extensions/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"extensions/v1beta1","version":"v1beta1"}},{"name":"apps","versions":[{"groupVersion":"apps/v1","version":"v1"}],"preferredVersion":{"groupVersion":"apps/v1","version":"v1"}},{"name":"events.k8s.io","versions":[{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}},{"name":"authentication.k8s.io","versions":[{"groupVersion":"authentication.k8s.io/v1","version":"v1"},{"groupVersion":"authentication.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"authentication.k8s.io/v1"," [truncated 4985 chars]I1211 15:19:47.614700 15077 round_trippers.go:420] GET https://127.0.0.1:6443/apis/batch/v1?timeout=32sI1211 15:19:47.614804 15077 round_trippers.go:420] GET https://127.0.0.1:6443/apis/authentication.k8s.io/v1?timeout=32sI1211 15:19:47.615687 15077 round_trippers.go:420] GET https://127.0.0.1:6443/apis/auth.tkestack.io/v1?timeout=32shttps://127.0.0.1:6443/apis/authentication.k8s.io/v1beta1?timeout=32sI1211 15:19:47.616794 15077 round_trippers.go:420] GET https://127.0.0.1:6443/apis/coordination.k8s.io/v1?timeout=32sI1211 15:19:47.616863 15077 round_trippers.go:420] GET https://127.0.0.1:6443/apis/apps/v1?timeout=32s...NAME SHORTNAMES APIGROUP NAMESPACED KINDbindings true Bindingendpoints ep true Endpointsevents ev true Eventlimitranges limits true LimitRangenamespaces ns false Namespacenodes no false Node...
namingController
:检查 crd obj 中是否有命名冲突,可在 crd.status.conditions
中查看;establishingController
:检查 crd 是否处于正常状态,可在 crd.status.conditions
中查看;nonStructuralSchemaController
:检查 crd obj 结构是否正常,可在 crd.status.conditions
中查看;apiApprovalController
:检查 crd 是否遵循 Kubernetes API 声明策略,可在 crd.status.conditions
中查看;finalizingController
:类似于 finalizes 的功能,与 CRs 的删除有关;
总结CR CRUD APIServer处理逻辑如下:
- createAPIExtensionsServer=>NewCustomResourceDefinitionHandler=>crdHandler=>注册CR CRUD API接口:
// New returns a new instance of CustomResourceDefinitions from the given config.func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) { ... crdHandler, err := NewCustomResourceDefinitionHandler( versionDiscoveryHandler, groupDiscoveryHandler, s.Informers.Apiextensions().V1().CustomResourceDefinitions(), delegateHandler, c.ExtraConfig.CRDRESTOptionsGetter, c.GenericConfig.AdmissionControl, establishingController, c.ExtraConfig.ServiceResolver, c.ExtraConfig.AuthResolverWrapper, c.ExtraConfig.MasterCount, s.GenericAPIServer.Authorizer, c.GenericConfig.RequestTimeout, time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second, apiGroupInfo.StaticOpenAPISpec, c.GenericConfig.MaxRequestBodyBytes, ) if err != nil { return nil, err } s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler) ... return s, nil}
crdHandler处理逻辑如下:
解析req(GET /apis/duyanghao.example.com/v1/namespaces/default/students),根据请求路径中的group(duyanghao.example.com),version(v1),以及resource字段(students)获取对应CRD内容(crd, err := r.crdLister.Get(crdName))
通过crd.UID以及crd.Name获取crdInfo,若不存在则创建对应的crdInfo(crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name))。crdInfo中包含了CRD定义以及该CRD对应Custom Resource的customresource.REST storage
customresource.REST storage由CR对应的Group(duyanghao.example.com),Version(v1),Kind(Student),Resource(students)等创建完成,由于CR在Kubernetes代码中并没有具体结构体定义,所以这里会先初始化一个范型结构体Unstructured(用于保存所有类型的Custom Resource),并对该结构体进行SetGroupVersionKind操作(设置具体Custom Resource Type)
从customresource.REST storage获取Unstructured结构体后会对其进行相应转换然后返回
// k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go:223func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {ctx := req.Context()requestInfo, ok := apirequest.RequestInfoFrom(ctx)...crdName := requestInfo.Resource + "." + requestInfo.APIGroupcrd, err := r.crdLister.Get(crdName)...crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name)verb := strings.ToUpper(requestInfo.Verb)resource := requestInfo.Resourcesubresource := requestInfo.Subresourcescope := metrics.CleanScope(requestInfo)...switch {case subresource == "status" && subresources != nil && subresources.Status != nil: handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)case subresource == "scale" && subresources != nil && subresources.Scale != nil: handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)case len(subresource) == 0: handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)default: responsewriters.ErrorNegotiated( apierrors.NewNotFound(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Name), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req, )}if handlerFunc != nil { handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc) handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup) handler.ServeHTTP(w, req) return}}
更多代码原理详情,参考 kubernetes-reading-notes 。
Conclusion
本文从源码层面对Kubernetes apiserver进行了一个概览性总结,包括:aggregatorServer,kubeAPIServer,apiExtensionsServer以及bootstrap-controller等。通过阅读本文可以对apiserver内部原理有一个大致的理解,另外也有助于后续深入研究
Refs
- kubernetes-reading-notes
每一份赞赏源于懂得
赞赏
0人进行了赞赏支持
更多相关文章
- pandas读取表格后的常用数据处理操作
- Java常用的时间工具类DateTimeUtils.java对常用的时间操作方法总
- 肝了一个月的编程导航诞生!轻松发现优质编程资源
- Python骚操作:动态定义函数
- 假期玩得开心也不忘充电,学习Python操作JSON,网络数据交换不用愁
- LeetCode 题解:一顿操作猛如虎,一看击败百分五
- 如何设计一个支持增量操作的栈
- 对列和行的操作
- 从一道简单算法题理解快速排序的 partition 操作