How to use shell script and client go to implement your own k8s scheduler

Introduction to scheduler

scheduler is a part of k8s master and exists in k8s ecosystem as a plug-in.

Customize scheduler mode

 

  • Add feature recompile
  • Implement your own multi scheduler
  • The scheduler calls the extender to realize the final scheduling (Kubernetes scheduler extender)

Add scheduling function

In k8s Introduction to scheduling algorithm

Preselection Preferred

Implement your own scheduler (configure multiple schedulers)

Schedulers exist in the form of plug-ins. Multiple schedulers can exist in the cluster, and schedulers can be specified explicitly

Configure the pod to use its own scheduler

The following pod explicitly specifies to use the my scheduler scheduler

apiVersion: v1
kind: Pod
metadata:
  name: nginx
  labels:
    app: nginx
spec:
  schedulerName: my-scheduler
  containers:
  - name: nginx
    image: nginx:1.10

Official shell version scheduler example

#!/bin/bash
SERVER='localhost:8001'
while true;
do
    for PODNAME in $(kubectl --server $SERVER get pods -o json | jq '.items[] | select(.spec.schedulerName == "my-scheduler") | select(.spec.nodeName == null) | .metadata.name' | tr -d '"')
;
    do
        NODES=($(kubectl --server $SERVER get nodes -o json | jq '.items[].metadata.name' | tr -d '"'))
        NUMNODES=${#NODES[@]}
        CHOSEN=${NODES[$[ $RANDOM % $NUMNODES ]]}
        curl --header "Content-Type:application/json" --request POST --data '{"apiVersion":"v1", "kind": "Binding", "metadata": {"name": "'$PODNAME'"}, "target": {"apiVersion": "v1", "kind"
: "Node", "name": "'$CHOSEN'"}}' http://$SERVER/api/v1/namespaces/default/pods/$PODNAME/binding/
        echo "Assigned $PODNAME to $CHOSEN"
    done
    sleep 1
done

Factors affecting pod scheduling

https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/

Preselection

Filter node s that do not meet the operating conditions

Preferred

Score node

seize

The priority of Pod can be specified in Kubernetes 1.8 and later versions. Priority indicates the importance of one Pod relative to other pods.
When pods cannot be scheduled, the scheduler will try to preempt (expel) low priority pods so that these suspended pods can be scheduled.
In future releases of Kubernetes, priority will also affect the ranking of resource recycling on nodes.

1.9 + supports PDB and gives priority to PDB policy. However, if other pods cannot be preempted, the pods configured with PDB policy will still be preempted

Kubernetes scheduler extender

scheduler policy configuration

{
  "kind" : "Policy",
  "apiVersion" : "v1",
  "predicates" : [
    {"name" : "PodFitsHostPorts"},
    {"name" : "PodFitsResources"},
    {"name" : "NoDiskConflict"},
    {"name" : "MatchNodeSelector"},
    {"name" : "HostName"}
    ],
  "priorities" : [
    {"name" : "LeastRequestedPriority", "weight" : 1},
    {"name" : "BalancedResourceAllocation", "weight" : 1},
    {"name" : "ServiceSpreadingPriority", "weight" : 1},
    {"name" : "EqualPriority", "weight" : 1}
    ],
  "extenders" : [
    {
          "urlPrefix": "http://localhost/scheduler",
          "apiVersion": "v1beta1",
          "filterVerb": "predicates/always_true",
          "bindVerb": "",
          "prioritizeVerb": "priorities/zero_score",
          "weight": 1,
          "enableHttps": false,
          "nodeCacheCapable": false
          "httpTimeout": 10000000000
    }
      ],
  "hardPodAffinitySymmetricWeight" : 10
  }

Configuration with extender

// ExtenderConfig saves the parameters used to communicate with the extender. If the verb is unspecified / empty, the extender is considered to have chosen not to provide the extension.
type ExtenderConfig struct {
    // url prefix to access this extender
    URLPrefix string `json:"urlPrefix"`
    //The verb called by the filter is null if it is not supported. This predicate is appended to urlrefix when a filter call is made to the extender
    FilterVerb string `json:"filterVerb,omitempty"`
    //The verb called by prioritize is null if it is not supported. This predicate is appended to urlrefix when a priority call is made to the extender.
    PrioritizeVerb string `json:"prioritizeVerb,omitempty"`
    //The number multiplier of the node score generated by the priority call, and the weight should be a positive integer
    Weight int `json:"weight,omitempty"`
    //The verb called by binding is null if it is not supported. This predicate is appended to urlrefix when a binding call is made to the extender.
    //If this method is implemented by the extender, the pod binding action will be returned to the apiserver by the extender. Only one extension can implement this function
    BindVerb string
    // EnableHTTPS specifies whether https should be used to communicate with the extender
    EnableHTTPS bool `json:"enableHttps,omitempty"`
    // TLSConfig specifies the transport layer security configuration
    TLSConfig *restclient.TLSClientConfig `json:"tlsConfig,omitempty"`
    // HTTPTimeout specifies the timeout duration of the call to the extender. The filter timeout cannot schedule the pod. Prioritize timeout ignored
    //k8s or other extender priorities are used to select nodes
    HTTPTimeout time.Duration `json:"httpTimeout,omitempty"`
    //Nodecacheable specifies that the extender can cache node information
    //Therefore, the scheduler should only send the minimum information about qualified nodes
    //It is assumed that the extender has cached the full details of all nodes in the cluster
    NodeCacheCapable bool `json:"nodeCacheCapable,omitempty"`
    // Managed resources is a list of extended resources managed by the extender
    // -If pod requests at least one extension resource in this list, it will be in Filter, Prioritize and Bind (if the extender is a binder)
    //Phase sends a pane to the extender. If empty or unspecified, all pod s will be sent to this extender.
    // If the pod requests at least one extension resource in this list, a pod will be sent to the extender in the Filter, Prioritize and Bind (if the extender is a binder) stages. If empty or unspecified, all pods will be sent to this extender.
    ManagedResources []ExtenderManagedResource `json:"managedResources,omitempty"`
}

Parameters passed to the FilterVerb endpoint on the extender from the node set filtered through k8s predictions and pod.
Parameters passed to the PrioritizeVerb endpoint on the extender through k8s predictions, extended predictions and the node set filtered by pod.

// ExtenderArgs represents the parameters required by the extender for pod filter/prioritize node
type ExtenderArgs struct {
    // Scheduled pod
    Pod   api.Pod      `json:"pod"`
    // Schedulable candidate list
    Nodes api.NodeList `json:"nodes"`
}

When "filter" is called, it returns the node list (schedulerapi.ExtenderFilterResult),
"prioritize" returns the priority of the node (schedulerapi.HostPriorityList)

"filter" can cut the node list according to the corresponding action, and the score returned by "prioritize" will be added to k8s final score (calculated through its priority function) for final host selection.

The "bind" call is used to bind the proxy of the pod to the node to the extender. It can optionally be implemented by an extender. When it is implemented,
It is the response of the extender that issued the binding call to apiserver. The Pod name, namespace, UID, and node name are passed to the extender

ExtenderBindingArgs indicates the parameters of the extender that binds the pod to the node

type ExtenderBindingArgs struct {
    // pod to be bound
    PodName string
    // namespace to be bound
    PodNamespace string
    // poduid
    PodUID types.UID
    // The pod finally dispatched to
    Node string
}

realization

package main

import (
    "bytes"
    "encoding/json"
    "io"
    "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    schedulerapi "k8s.io/kubernetes/pkg/scheduler/api/v1"
    "log"
    "net/http"
)

var (
    kubeconfig string = "xxx"
)

func main() {
    http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
        w.Write([]byte("hellowrold"))
    })
    http.HandleFunc("/predicates/test", testPredicateHandler)
    http.HandleFunc("/prioritize/test", testPrioritizeHandler)
    http.HandleFunc("/bind/test", BindHandler)
    http.ListenAndServe(":8880", nil)
}

func testPredicateHandler(w http.ResponseWriter, r *http.Request) {
    var buf bytes.Buffer
    body := io.TeeReader(r.Body, &buf)
    log.Println(buf.String())

    var extenderArgs schedulerapi.ExtenderArgs
    var extenderFilterResult *schedulerapi.ExtenderFilterResult

    if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil {
        extenderFilterResult = &schedulerapi.ExtenderFilterResult{
            Nodes:       nil,
            FailedNodes: nil,
            Error:       err.Error(),
        }
    } else {
        extenderFilterResult = predicateFunc(extenderArgs)
    }
    if resultBody, err := json.Marshal(extenderFilterResult); err != nil {
        panic(err)
    } else {
        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(http.StatusOK)
        w.Write(resultBody)
    }

}

func testPrioritizeHandler(w http.ResponseWriter, r *http.Request) {
    var buf bytes.Buffer
    body := io.TeeReader(r.Body, &buf)
    var extenderArgs schedulerapi.ExtenderArgs
    var hostPriorityList *schedulerapi.HostPriorityList
    if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil {
        panic(err)
    }
    if list, err := prioritizeFunc(extenderArgs); err != nil {
        panic(err)
    } else {
        hostPriorityList = list
    }
    if resultBody, err := json.Marshal(hostPriorityList); err != nil {
        panic(err)
    } else {
        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(http.StatusOK)
        w.Write(resultBody)
    }
}

func predicateFunc(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
    pod := args.Pod
    canSchedule := make([]v1.Node, 0, len(args.Nodes.Items))
    canNotSchedule := make(map[string]string)
    for _, node := range args.Nodes.Items {
        result, err := func(pod v1.Pod, node v1.Node) (bool, error) {
            return true, nil
        }(pod, node)
        if err != nil {
            canNotSchedule[node.Name] = err.Error()
        } else {
            if result {
                canSchedule = append(canSchedule, node)
            }
        }
    }
    result := schedulerapi.ExtenderFilterResult{
        Nodes: &v1.NodeList{
            Items: canSchedule,
        },
        FailedNodes: canNotSchedule,
        Error:       "",
    }
    return &result
}

func prioritizeFunc(args schedulerapi.ExtenderArgs) (*schedulerapi.HostPriorityList, error) {
    nodes := args.Nodes.Items
    var priorityList schedulerapi.HostPriorityList
    priorityList = make([]schedulerapi.HostPriority, len(nodes))
    for i, node := range nodes {
        priorityList[i] = schedulerapi.HostPriority{
            Host:  node.Name,
            Score: 0,
        }
    }
    return &priorityList, nil
}

func BindHandler(w http.ResponseWriter, r *http.Request) {
    var buf bytes.Buffer
    body := io.TeeReader(r.Body, &buf)
    var extenderBindingArgs schedulerapi.ExtenderBindingArgs
    if err := json.NewDecoder(body).Decode(&extenderBindingArgs); err != nil {
        panic(err)
    }
    b := &v1.Binding{
        ObjectMeta: metav1.ObjectMeta{Namespace: extenderBindingArgs.PodNamespace, Name: extenderBindingArgs.PodName, UID: extenderBindingArgs.PodUID},
        Target: v1.ObjectReference{
            Kind: "Node",
            Name: extenderBindingArgs.Node,
        },
    }
    bind(b)

}

func bind(b *v1.Binding) error {
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        panic(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }
    return clientset.CoreV1().Pods(b.Namespace).Bind(b)
}

reference resources:
https://github.com/kubernetes/community/blob/master/contributors/devel/scheduler.md

https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/scheduler_extender.md

https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/

https://github.com/kubernetes/kubernetes-docs-cn/blob/master/docs/concepts/overview/extending.md

Welcome to QQ group: k8s development and Practice

Posted by cruzbullit on Fri, 15 Apr 2022 05:07:33 +0930