Introduction to scheduler
scheduler is a part of k8s master and exists in k8s ecosystem as a plug-in.
Customize scheduler mode
- Add feature recompile
- Implement your own multi scheduler
- The scheduler calls the extender to realize the final scheduling (Kubernetes scheduler extender)
Add scheduling function
In k8s Introduction to scheduling algorithm
Implement your own scheduler (configure multiple schedulers)
Schedulers exist in the form of plug-ins. Multiple schedulers can exist in the cluster, and schedulers can be specified explicitly
Configure the pod to use its own scheduler
The following pod explicitly specifies to use the my scheduler scheduler
apiVersion: v1 kind: Pod metadata: name: nginx labels: app: nginx spec: schedulerName: my-scheduler containers: - name: nginx image: nginx:1.10
Official shell version scheduler example
#!/bin/bash SERVER='localhost:8001' while true; do for PODNAME in $(kubectl --server $SERVER get pods -o json | jq '.items[] | select(.spec.schedulerName == "my-scheduler") | select(.spec.nodeName == null) | .metadata.name' | tr -d '"') ; do NODES=($(kubectl --server $SERVER get nodes -o json | jq '.items[].metadata.name' | tr -d '"')) NUMNODES=${#NODES[@]} CHOSEN=${NODES[$[ $RANDOM % $NUMNODES ]]} curl --header "Content-Type:application/json" --request POST --data '{"apiVersion":"v1", "kind": "Binding", "metadata": {"name": "'$PODNAME'"}, "target": {"apiVersion": "v1", "kind" : "Node", "name": "'$CHOSEN'"}}' http://$SERVER/api/v1/namespaces/default/pods/$PODNAME/binding/ echo "Assigned $PODNAME to $CHOSEN" done sleep 1 done
Factors affecting pod scheduling
https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/
Preselection
Filter node s that do not meet the operating conditions
Preferred
Score node
seize
The priority of Pod can be specified in Kubernetes 1.8 and later versions. Priority indicates the importance of one Pod relative to other pods.
When pods cannot be scheduled, the scheduler will try to preempt (expel) low priority pods so that these suspended pods can be scheduled.
In future releases of Kubernetes, priority will also affect the ranking of resource recycling on nodes.
1.9 + supports PDB and gives priority to PDB policy. However, if other pods cannot be preempted, the pods configured with PDB policy will still be preempted
Kubernetes scheduler extender
scheduler policy configuration
{ "kind" : "Policy", "apiVersion" : "v1", "predicates" : [ {"name" : "PodFitsHostPorts"}, {"name" : "PodFitsResources"}, {"name" : "NoDiskConflict"}, {"name" : "MatchNodeSelector"}, {"name" : "HostName"} ], "priorities" : [ {"name" : "LeastRequestedPriority", "weight" : 1}, {"name" : "BalancedResourceAllocation", "weight" : 1}, {"name" : "ServiceSpreadingPriority", "weight" : 1}, {"name" : "EqualPriority", "weight" : 1} ], "extenders" : [ { "urlPrefix": "http://localhost/scheduler", "apiVersion": "v1beta1", "filterVerb": "predicates/always_true", "bindVerb": "", "prioritizeVerb": "priorities/zero_score", "weight": 1, "enableHttps": false, "nodeCacheCapable": false "httpTimeout": 10000000000 } ], "hardPodAffinitySymmetricWeight" : 10 }
Configuration with extender
// ExtenderConfig saves the parameters used to communicate with the extender. If the verb is unspecified / empty, the extender is considered to have chosen not to provide the extension. type ExtenderConfig struct { // url prefix to access this extender URLPrefix string `json:"urlPrefix"` //The verb called by the filter is null if it is not supported. This predicate is appended to urlrefix when a filter call is made to the extender FilterVerb string `json:"filterVerb,omitempty"` //The verb called by prioritize is null if it is not supported. This predicate is appended to urlrefix when a priority call is made to the extender. PrioritizeVerb string `json:"prioritizeVerb,omitempty"` //The number multiplier of the node score generated by the priority call, and the weight should be a positive integer Weight int `json:"weight,omitempty"` //The verb called by binding is null if it is not supported. This predicate is appended to urlrefix when a binding call is made to the extender. //If this method is implemented by the extender, the pod binding action will be returned to the apiserver by the extender. Only one extension can implement this function BindVerb string // EnableHTTPS specifies whether https should be used to communicate with the extender EnableHTTPS bool `json:"enableHttps,omitempty"` // TLSConfig specifies the transport layer security configuration TLSConfig *restclient.TLSClientConfig `json:"tlsConfig,omitempty"` // HTTPTimeout specifies the timeout duration of the call to the extender. The filter timeout cannot schedule the pod. Prioritize timeout ignored //k8s or other extender priorities are used to select nodes HTTPTimeout time.Duration `json:"httpTimeout,omitempty"` //Nodecacheable specifies that the extender can cache node information //Therefore, the scheduler should only send the minimum information about qualified nodes //It is assumed that the extender has cached the full details of all nodes in the cluster NodeCacheCapable bool `json:"nodeCacheCapable,omitempty"` // Managed resources is a list of extended resources managed by the extender // -If pod requests at least one extension resource in this list, it will be in Filter, Prioritize and Bind (if the extender is a binder) //Phase sends a pane to the extender. If empty or unspecified, all pod s will be sent to this extender. // If the pod requests at least one extension resource in this list, a pod will be sent to the extender in the Filter, Prioritize and Bind (if the extender is a binder) stages. If empty or unspecified, all pods will be sent to this extender. ManagedResources []ExtenderManagedResource `json:"managedResources,omitempty"` }
Parameters passed to the FilterVerb endpoint on the extender from the node set filtered through k8s predictions and pod.
Parameters passed to the PrioritizeVerb endpoint on the extender through k8s predictions, extended predictions and the node set filtered by pod.
// ExtenderArgs represents the parameters required by the extender for pod filter/prioritize node type ExtenderArgs struct { // Scheduled pod Pod api.Pod `json:"pod"` // Schedulable candidate list Nodes api.NodeList `json:"nodes"` }
When "filter" is called, it returns the node list (schedulerapi.ExtenderFilterResult),
"prioritize" returns the priority of the node (schedulerapi.HostPriorityList)
"filter" can cut the node list according to the corresponding action, and the score returned by "prioritize" will be added to k8s final score (calculated through its priority function) for final host selection.
The "bind" call is used to bind the proxy of the pod to the node to the extender. It can optionally be implemented by an extender. When it is implemented,
It is the response of the extender that issued the binding call to apiserver. The Pod name, namespace, UID, and node name are passed to the extender
ExtenderBindingArgs indicates the parameters of the extender that binds the pod to the node
type ExtenderBindingArgs struct { // pod to be bound PodName string // namespace to be bound PodNamespace string // poduid PodUID types.UID // The pod finally dispatched to Node string }
realization
package main import ( "bytes" "encoding/json" "io" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api/v1" "log" "net/http" ) var ( kubeconfig string = "xxx" ) func main() { http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { w.Write([]byte("hellowrold")) }) http.HandleFunc("/predicates/test", testPredicateHandler) http.HandleFunc("/prioritize/test", testPrioritizeHandler) http.HandleFunc("/bind/test", BindHandler) http.ListenAndServe(":8880", nil) } func testPredicateHandler(w http.ResponseWriter, r *http.Request) { var buf bytes.Buffer body := io.TeeReader(r.Body, &buf) log.Println(buf.String()) var extenderArgs schedulerapi.ExtenderArgs var extenderFilterResult *schedulerapi.ExtenderFilterResult if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil { extenderFilterResult = &schedulerapi.ExtenderFilterResult{ Nodes: nil, FailedNodes: nil, Error: err.Error(), } } else { extenderFilterResult = predicateFunc(extenderArgs) } if resultBody, err := json.Marshal(extenderFilterResult); err != nil { panic(err) } else { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write(resultBody) } } func testPrioritizeHandler(w http.ResponseWriter, r *http.Request) { var buf bytes.Buffer body := io.TeeReader(r.Body, &buf) var extenderArgs schedulerapi.ExtenderArgs var hostPriorityList *schedulerapi.HostPriorityList if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil { panic(err) } if list, err := prioritizeFunc(extenderArgs); err != nil { panic(err) } else { hostPriorityList = list } if resultBody, err := json.Marshal(hostPriorityList); err != nil { panic(err) } else { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write(resultBody) } } func predicateFunc(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult { pod := args.Pod canSchedule := make([]v1.Node, 0, len(args.Nodes.Items)) canNotSchedule := make(map[string]string) for _, node := range args.Nodes.Items { result, err := func(pod v1.Pod, node v1.Node) (bool, error) { return true, nil }(pod, node) if err != nil { canNotSchedule[node.Name] = err.Error() } else { if result { canSchedule = append(canSchedule, node) } } } result := schedulerapi.ExtenderFilterResult{ Nodes: &v1.NodeList{ Items: canSchedule, }, FailedNodes: canNotSchedule, Error: "", } return &result } func prioritizeFunc(args schedulerapi.ExtenderArgs) (*schedulerapi.HostPriorityList, error) { nodes := args.Nodes.Items var priorityList schedulerapi.HostPriorityList priorityList = make([]schedulerapi.HostPriority, len(nodes)) for i, node := range nodes { priorityList[i] = schedulerapi.HostPriority{ Host: node.Name, Score: 0, } } return &priorityList, nil } func BindHandler(w http.ResponseWriter, r *http.Request) { var buf bytes.Buffer body := io.TeeReader(r.Body, &buf) var extenderBindingArgs schedulerapi.ExtenderBindingArgs if err := json.NewDecoder(body).Decode(&extenderBindingArgs); err != nil { panic(err) } b := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: extenderBindingArgs.PodNamespace, Name: extenderBindingArgs.PodName, UID: extenderBindingArgs.PodUID}, Target: v1.ObjectReference{ Kind: "Node", Name: extenderBindingArgs.Node, }, } bind(b) } func bind(b *v1.Binding) error { config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } return clientset.CoreV1().Pods(b.Namespace).Bind(b) }
reference resources:
https://github.com/kubernetes/community/blob/master/contributors/devel/scheduler.md
https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/
https://github.com/kubernetes/kubernetes-docs-cn/blob/master/docs/concepts/overview/extending.md
Welcome to QQ group: k8s development and Practice