本文旨在记录对中间件、编排组件容器化部署后,实现kubernetes扩展组件Controller的过程。

Third-Parties

kubernetes-client: javascript

client-go

kube-rs

client-go源码分析

目录结构

  • kubernetes: contains the clientset to access Kubernetes API.
  • discovery: discover APIs supported by a Kubernetes API server.
  • dynamic: contains a dynamic client that can perform generic operations on arbitrary Kubernetes API objects.
  • transport: set up auth and start a connection.
  • tools/cache: useful for writing controllers.
  • informers: informer group
  • listers: lister group

代码实例

1
2
3
4
git clone https://github.com/huweihuang/client-go.git
cd client-go
#保证本地HOME目录有配置kubernetes集群的配置文件
go run client-go.go

client-go.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"time"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	var kubeconfig *string
	if home := homeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()
	// uses the current context in kubeconfig
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err.Error())
	}
	// creates the clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	for {
		pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
		if err != nil {
			panic(err.Error())
		}
		fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
		time.Sleep(10 * time.Second)
	}
}

func homeDir() string {
	if h := os.Getenv("HOME"); h != "" {
		return h
	}
	return os.Getenv("USERPROFILE") // windows
}

output

1
2
3
4
5
6
➜ go run client-go.go
There are 9 pods in the cluster
There are 7 pods in the cluster
There are 7 pods in the cluster
There are 7 pods in the cluster
There are 7 pods in the cluster

kubeconfig

1
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")

为了获取k8s配置文件kubeconfig的绝对路径,一般路径为$HOME/.kube/config,这个文件主要用来配置本地连接的k8s集群。

config的内容大概如图所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
apiVersion: v1
clusters:
- cluster:
    server: http://<kube-master-ip>:8080
  name: k8s
contexts:
- context:
    cluster: k8s
    namespace: default
    user: ""
  name: default
current-context: default
kind: Config
preferences: {}
users: []

rest.config

通过参数和BuildConfigFromFlags方法获取rest.Config对象

1
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)

k8s.io/client-go/tools/clientcmd/client_config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
	if kubeconfigPath == "" && masterUrl == "" {
		klog.Warning("Neither --kubeconfig nor --master was specified.  Using the inClusterConfig.  This might not work.")
		kubeconfig, err := restclient.InClusterConfig()
		if err == nil {
			return kubeconfig, nil
		}
		klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
	}
	return NewNonInteractiveDeferredLoadingClientConfig(
		&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
		&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}

clientset

通过*rest.Config参数和NewForConfig方法来获取clientset对象,clientset是多个client集合,每个client可能包括不同版本的方法调用

1
clientset, err := kubernetes.NewForConfig(config)

NewForConfig

NewForConfig函数就是初始化clientset中的每个client。

k8s.io/client-go/kubernetes/clientset.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *rest.Config) (*Clientset, error) {
	configShallowCopy := *c
	...
	var cs Clientset
	cs.appsV1beta1, err = appsv1beta1.NewForConfig(&configShallowCopy)
	...
	cs.coreV1, err = corev1.NewForConfig(&configShallowCopy)
	...
}

clientset的结构体

k8s.io/client-go/kubernetes/clientset.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Clientset contains the clients for groups. Each group has exactly one
// version included in a Clientset.
type Clientset struct {
	*discovery.DiscoveryClient
	admissionregistrationV1      *admissionregistrationv1.AdmissionregistrationV1Client
	admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
	internalV1alpha1             *internalv1alpha1.InternalV1alpha1Client
	appsV1                       *appsv1.AppsV1Client
	appsV1beta1                  *appsv1beta1.AppsV1beta1Client
	appsV1beta2                  *appsv1beta2.AppsV1beta2Client
	authenticationV1             *authenticationv1.AuthenticationV1Client
	authenticationV1beta1        *authenticationv1beta1.AuthenticationV1beta1Client
	authorizationV1              *authorizationv1.AuthorizationV1Client
	authorizationV1beta1         *authorizationv1beta1.AuthorizationV1beta1Client
	autoscalingV1                *autoscalingv1.AutoscalingV1Client
	autoscalingV2beta1           *autoscalingv2beta1.AutoscalingV2beta1Client
	autoscalingV2beta2           *autoscalingv2beta2.AutoscalingV2beta2Client
	batchV1                      *batchv1.BatchV1Client
	batchV1beta1                 *batchv1beta1.BatchV1beta1Client
	certificatesV1               *certificatesv1.CertificatesV1Client
	certificatesV1beta1          *certificatesv1beta1.CertificatesV1beta1Client
	coordinationV1beta1          *coordinationv1beta1.CoordinationV1beta1Client
	coordinationV1               *coordinationv1.CoordinationV1Client
	coreV1                       *corev1.CoreV1Client
	discoveryV1                  *discoveryv1.DiscoveryV1Client
	discoveryV1beta1             *discoveryv1beta1.DiscoveryV1beta1Client
	eventsV1                     *eventsv1.EventsV1Client
	eventsV1beta1                *eventsv1beta1.EventsV1beta1Client
	extensionsV1beta1            *extensionsv1beta1.ExtensionsV1beta1Client
	flowcontrolV1alpha1          *flowcontrolv1alpha1.FlowcontrolV1alpha1Client
	flowcontrolV1beta1           *flowcontrolv1beta1.FlowcontrolV1beta1Client
	networkingV1                 *networkingv1.NetworkingV1Client
	networkingV1beta1            *networkingv1beta1.NetworkingV1beta1Client
	nodeV1                       *nodev1.NodeV1Client
	nodeV1alpha1                 *nodev1alpha1.NodeV1alpha1Client
	nodeV1beta1                  *nodev1beta1.NodeV1beta1Client
	policyV1                     *policyv1.PolicyV1Client
	policyV1beta1                *policyv1beta1.PolicyV1beta1Client
	rbacV1                       *rbacv1.RbacV1Client
	rbacV1beta1                  *rbacv1beta1.RbacV1beta1Client
	rbacV1alpha1                 *rbacv1alpha1.RbacV1alpha1Client
	schedulingV1alpha1           *schedulingv1alpha1.SchedulingV1alpha1Client
	schedulingV1beta1            *schedulingv1beta1.SchedulingV1beta1Client
	schedulingV1                 *schedulingv1.SchedulingV1Client
	storageV1beta1               *storagev1beta1.StorageV1beta1Client
	storageV1                    *storagev1.StorageV1Client
	storageV1alpha1              *storagev1alpha1.StorageV1alpha1Client
}

clientset.Interface

clientset实现了以下的interface,可以通过以下的方法获得具体的client,例如:

1
pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
type Interface interface {
	Discovery() discovery.DiscoveryInterface
	AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
	AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
	InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
	AppsV1() appsv1.AppsV1Interface
	AppsV1beta1() appsv1beta1.AppsV1beta1Interface
	AppsV1beta2() appsv1beta2.AppsV1beta2Interface
	AuthenticationV1() authenticationv1.AuthenticationV1Interface
	AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
	AuthorizationV1() authorizationv1.AuthorizationV1Interface
	AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
	AutoscalingV1() autoscalingv1.AutoscalingV1Interface
	AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
	AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
	BatchV1() batchv1.BatchV1Interface
	BatchV1beta1() batchv1beta1.BatchV1beta1Interface
	CertificatesV1() certificatesv1.CertificatesV1Interface
	CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
	CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
	CoordinationV1() coordinationv1.CoordinationV1Interface
	CoreV1() corev1.CoreV1Interface
	DiscoveryV1() discoveryv1.DiscoveryV1Interface
	DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
	EventsV1() eventsv1.EventsV1Interface
	EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
	ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
	FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
	FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
	NetworkingV1() networkingv1.NetworkingV1Interface
	NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
	NodeV1() nodev1.NodeV1Interface
	NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
	NodeV1beta1() nodev1beta1.NodeV1beta1Interface
	PolicyV1() policyv1.PolicyV1Interface
	PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
	RbacV1() rbacv1.RbacV1Interface
	RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
	RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
	SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
	SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
	SchedulingV1() schedulingv1.SchedulingV1Interface
	StorageV1beta1() storagev1beta1.StorageV1beta1Interface
	StorageV1() storagev1.StorageV1Interface
	StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}

CoreV1Client

常用的CoreV1Client为例子分析

corev1.NewForConfig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// NewForConfig creates a new CoreV1Client for the given config.
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	client, err := rest.RESTClientFor(&config)
	if err != nil {
		return nil, err
	}
	return &CoreV1Client{client}, nil
}

通过传入配置信息rest.Config来实例化对象,其本质是调用了rest.RESTClientFor(&Config)方法创建了RESTClient的对象,即CoreV1Client的本质就是一个RESTClient对象。

CoreV1Client结构体定义

1
2
3
4
// CoreV1Client is used to interact with features provided by the  group.
type CoreV1Client struct {
	restClient rest.Interface
}

CoreV1Client实现了CoreV1Interface接口,从而对k8s的资源对象进行增删改查的操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//CoreV1Client的方法
func (c *CoreV1Client) ComponentStatuses() ComponentStatusInterface {...}
//ConfigMaps
func (c *CoreV1Client) ConfigMaps(namespace string) ConfigMapInterface {...}
//Endpoints
func (c *CoreV1Client) Endpoints(namespace string) EndpointsInterface {...}
func (c *CoreV1Client) Events(namespace string) EventInterface {...}
func (c *CoreV1Client) LimitRanges(namespace string) LimitRangeInterface {...}
//Namespaces
func (c *CoreV1Client) Namespaces() NamespaceInterface {...}
//Nodes
func (c *CoreV1Client) Nodes() NodeInterface {...}
func (c *CoreV1Client) PersistentVolumes() PersistentVolumeInterface {...}
func (c *CoreV1Client) PersistentVolumeClaims(namespace string) PersistentVolumeClaimInterface {...}
//Pods
func (c *CoreV1Client) Pods(namespace string) PodInterface {...}
func (c *CoreV1Client) PodTemplates(namespace string) PodTemplateInterface {...}
//ReplicationControllers
func (c *CoreV1Client) ReplicationControllers(namespace string) ReplicationControllerInterface {...}
func (c *CoreV1Client) ResourceQuotas(namespace string) ResourceQuotaInterface {...}
func (c *CoreV1Client) Secrets(namespace string) SecretInterface {...}
//Services
func (c *CoreV1Client) Services(namespace string) ServiceInterface {...}
func (c *CoreV1Client) ServiceAccounts(namespace string) ServiceAccountInterface {...}

CoreV1Interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type CoreV1Interface interface {
	RESTClient() rest.Interface
	ComponentStatusesGetter
	ConfigMapsGetter
	EndpointsGetter
	EventsGetter
	LimitRangesGetter
	NamespacesGetter
	NodesGetter
	PersistentVolumesGetter
	PersistentVolumeClaimsGetter
	PodsGetter
	PodTemplatesGetter
	ReplicationControllersGetter
	ResourceQuotasGetter
	SecretsGetter
	ServicesGetter
	ServiceAccountsGetter
}

CoreV1Interface中包含了各种k8s对象的调用接口,例如PodsGetter是对k8s中pod对象增删改查的接口。ServicesGetter是对service对象的操作的接口。

PodsGetter接口举例探索

可以以PodsGetter接口为例来研究一下CoreV1Client对pod对象的增删改查调用。

示例中的代码如下:

1
pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})

CoreV1().Pods()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// core_client.go
func (c *CoreV1Client) Pods(namespace string) PodInterface {
	return newPods(c, namespace)
}

// pod.go
// newPods returns a Pods
func newPods(c *CoreV1Client, namespace string) *pods {
	return &pods{
		client: c.RESTClient(),
		ns:     namespace,
	}
}

CoreV1().Pods()方法实际上调用了newPods()方法,创建了一个pod对象,该对象继承了rest.Interface接口,即最终的实现本质是RESTClient的HTTP调用。

1
2
3
4
5
// pods implements PodInterface
type pods struct {
	client rest.Interface
	ns     string
}

pods对象实现了PodInterface接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// PodInterface has methods to work with Pod resources.
type PodInterface interface {
	Create(*v1.Pod) (*v1.Pod, error)
	Update(*v1.Pod) (*v1.Pod, error)
	UpdateStatus(*v1.Pod) (*v1.Pod, error)
	Delete(name string, options *metav1.DeleteOptions) error
	DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
	Get(name string, options metav1.GetOptions) (*v1.Pod, error)
	List(opts metav1.ListOptions) (*v1.PodList, error)
	Watch(opts metav1.ListOptions) (watch.Interface, error)
	Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error)
	GetEphemeralContainers(podName string, options metav1.GetOptions) (*v1.EphemeralContainers, error)
	UpdateEphemeralContainers(podName string, ephemeralContainers *v1.EphemeralContainers) (*v1.EphemeralContainers, error)

	PodExpansion

这个interface定义了pods对象的增删改查等方法。

PodsGetter 继承了PodInterface的接口:

1
2
3
4
5
// PodsGetter has a method to return a PodInterface.
// A group's client should implement this interface.
type PodsGetter interface {
	Pods(namespace string) PodInterface
}

Pods().List()方法通过RESTClient的HTTP调用来实现对k8s的pod资源的获取:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(opts metav1.ListOptions) (result *v1.PodList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	result = &v1.PodList{}
	err = c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do().
		Into(result)
	return
}

以上分析了clientset.CoreV1().Pods("").List(metav1.ListOptions{})对pod资源获取的过程,最终是调用RESTClient的方法实现。

RESTClient

待补充

总结

client-go对K8s资源对象的调用,需要先获取k8s的配置信息,也就是$HOME/.kube/config

调用顺序如下:

kubeconfig -> rest.config -> clientset -> 具体的client(CoreV1Client) -> 具体的资源对象(如pod) -> RESTClient -> http.Client -> HTTP 请求的发送和响应

常用的client有CoreV1ClientAppsV1betaClientExtensionsV1beta1Client等。

Operator SDK