掘金 人工智能 前天 11:43
HAMi vGPU 原理分析 Part3:hami-scheduler 工作流程分析
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文是 HAMi 原理分析的第三篇,重点解析 hami-scheduler 的工作流程,特别是如何实现 spread 和 binpack 等高级调度策略。文章详细阐述了 HAMi Scheduler 作为 Kubernetes 调度器扩展器的工作原理,包括 Webhook 如何将 vGPU 资源申请的 Pod 重定向到 hami-scheduler,以及 hami-scheduler 如何通过 kube-scheduler 镜像和 Extender Scheduler 插件实现自定义调度逻辑。此外,还深入探讨了 hami-scheduler 如何通过解析 Node Annotations 和监控 Pod 状态来感知节点 GPU 资源及其使用情况,为精细化调度提供基础。

💡 **Webhook 引导与 Scheduler Extender 机制**:HAMi 通过 MutatingWebhookConfiguration 捕获申请 vGPU 资源的 Pod,并将其 `SchedulerName` 修改为 `hami-scheduler`。实际的调度逻辑由一个独立的 `vgpu-scheduler-extender` 容器实现,该容器作为 Kubernetes 调度器(kube-scheduler)的 Extender,通过 HTTP 请求接收调度任务,从而实现了自定义的调度策略。

📊 **节点 GPU 资源感知与缓存**:HAMi Scheduler 通过一个后台 Goroutine 定时从 Kubernetes API Server 拉取节点信息,并解析 Node Annotations(如 `hami.io/node-nvidia-register`)来获取每个节点的 GPU 数量、ID、显存大小等详细信息。这些信息被存储在内存中的 `nodeManager` 里,供调度决策时使用,确保了对节点 GPU 资源的精细化了解。

📈 **GPU 使用情况监控与计算**:为了实现细粒度的调度,HAMi Scheduler 利用 client-go 的 Informer 机制监听 Pod 的增删改事件。当 Pod 创建或更新时,它会解析 Pod Annotations(如 `hami.io/vgpu-node` 和 `hami.io/vgpu-devices-allocated`)来确定 Pod 分配到的 GPU 及其资源消耗(显存、核心数)。通过对比节点总 GPU 资源和已分配资源,计算出每个节点的剩余可用 GPU 资源。

⚙️ **调度策略实现与资源隔离**:HAMi Scheduler 的 Extender 插件负责具体的调度逻辑,包括 `spread` 和 `binpack` 等高级调度策略。它根据节点剩余的 GPU 资源量进行打分和选择,确保 Pod 被分配到最适合的节点。`managedResources` 配置项用于指示 Scheduler Extender 管理的资源类型,只有当 Pod 申请这些资源时,才由 HAMi Scheduler 处理,从而实现对 vGPU 等资源的有效管理和隔离。

上篇我们分析了 hami-webhook,该 Webhook 将申请了 vGPU 资源的 Pod 的调度器修改为 hami-scheduler,后续使用 hami-scheduler 进行调度。

本文为 HAMi 原理分析的第三篇,分析 hami-scheduler 工作流程。

上篇主要分析了 hami-webhook,解决了:Pod 是如何使用到 hami-scheduler,创建 Pod 时我们未指定 SchedulerName 默认会使用 default-scheduler 进行调度才对 问题。

这篇开始分析 hami-scheduler,解决另一个问题:hami-scheduler 逻辑,spread & binpark 等 高级调度策略是如何实现的

写完发现内容还是很多,spread & binpark 调度策略下一篇在分析吧,这篇主要分析调度流程。

以下分析基于 HAMi v2.4.0

省流:

HAMi Webhook 、Scheduler 工作流程如下:

1. 概述

Hami-scheduler 主要是 Pod 的调度逻辑,从集群节点中为当前 Pod 选择最合适的节点。

Hami-scheduler 也是通过 Scheduler Extender 方式实现的,可以参考上一篇文章 K8s 自定义调度器 Part1:通过 Scheduler Extender 实现自定义调度逻辑 手把手实现一个自定义调度器。

但是 HAMi 并没有直接扩展 default-scheduler,而是使用默认的 kube-scheduler 镜像额外启动了一个 scheduler,但是通过配置把名称指定为了 hami-scheduler。

然后给这个 hami-scheduler 配置了 Extender,Extender 服务就是同 Pod 中的另一个 Container 启动的一个 http 服务。

ps:后续说的 hami-scheduler 一般只这部分 Extender 实现的调度插件

2. 具体部署

Deployment

Hami-scheduler 使用 Deployment 进行部署,该 Deployment 中有两个 Container,其中一个是原生的 kube-scheduler,另一个则是 HAMi 的 Scheduler 服务。

完整 yaml 如下:

apiVersion: apps/v1kind: Deploymentmetadata:  name: vgpu-hami-scheduler  namespace: kube-systemspec:  template:    spec:      containers:      - command:        - kube-scheduler        - --config=/config/config.yaml        - -v=4        - --leader-elect=true        - --leader-elect-resource-name=hami-scheduler        - --leader-elect-resource-namespace=kube-system        image: 192.168.116.54:5000/kube-scheduler:v1.23.17        imagePullPolicy: IfNotPresent        name: kube-scheduler        resources: {}        terminationMessagePath: /dev/termination-log        terminationMessagePolicy: File        volumeMounts:        - mountPath: /config          name: scheduler-config      - command:        - scheduler        - --resource-name=nvidia.com/vgpu        - --resource-mem=nvidia.com/gpumem        - --resource-cores=nvidia.com/gpucores        - --resource-mem-percentage=nvidia.com/gpumem-percentage        - --resource-priority=nvidia.com/priority        - --http_bind=0.0.0.0:443        - --cert_file=/tls/tls.crt        - --key_file=/tls/tls.key        - --scheduler-name=hami-scheduler        - --metrics-bind-address=:9395        - --default-mem=0        - --default-gpu=1        - --default-cores=0        - --iluvatar-memory=iluvatar.ai/vcuda-memory        - --iluvatar-cores=iluvatar.ai/vcuda-core        - --cambricon-mlu-name=cambricon.com/vmlu        - --cambricon-mlu-memory=cambricon.com/mlu.smlu.vmemory        - --cambricon-mlu-cores=cambricon.com/mlu.smlu.vcore        - --ascend-name=huawei.com/Ascend910        - --ascend-memory=huawei.com/Ascend910-memory        - --ascend310p-name=huawei.com/Ascend310P        - --ascend310p-memory=huawei.com/Ascend310P-memory        - --overwrite-env=false        - --node-scheduler-policy=binpack        - --gpu-scheduler-policy=spread        - --debug        - -v=4        image: projecthami/hami:v2.3.13        imagePullPolicy: IfNotPresent        name: vgpu-scheduler-extender        ports:        - containerPort: 443          name: http          protocol: TCP        volumeMounts:        - mountPath: /tls          name: tls-config      dnsPolicy: ClusterFirst      priorityClassName: system-node-critical      restartPolicy: Always      schedulerName: default-scheduler      serviceAccount: vgpu-hami-scheduler      serviceAccountName: vgpu-hami-scheduler      terminationGracePeriodSeconds: 30      volumes:      - name: tls-config        secret:          defaultMode: 420          secretName: vgpu-hami-scheduler-tls      - configMap:          defaultMode: 420          name: vgpu-hami-scheduler-newversion        name: scheduler-config

KubeSchedulerConfiguration

对应的 Scheduler 的配置文件存储在 Configmap 中,具体内容如下:

apiVersion: v1data:  config.yaml: |    apiVersion: kubescheduler.config.k8s.io/v1beta2    kind: KubeSchedulerConfiguration    leaderElection:      leaderElect: false    profiles:    - schedulerName: hami-scheduler    extenders:    - urlPrefix: "https://127.0.0.1:443"      filterVerb: filter      bindVerb: bind      nodeCacheCapable: true      weight: 1      httpTimeout: 30s      enableHTTPS: true      tlsConfig:        insecure: true      managedResources:      - name: nvidia.com/vgpu        ignoredByScheduler: true      - name: nvidia.com/gpumem        ignoredByScheduler: true      - name: nvidia.com/gpucores        ignoredByScheduler: true      - name: nvidia.com/gpumem-percentage        ignoredByScheduler: true      - name: nvidia.com/priority        ignoredByScheduler: true      - name: cambricon.com/vmlu        ignoredByScheduler: true      - name: hygon.com/dcunum        ignoredByScheduler: true      - name: hygon.com/dcumem        ignoredByScheduler: true      - name: hygon.com/dcucores        ignoredByScheduler: true      - name: iluvatar.ai/vgpu        ignoredByScheduler: true      - name: huawei.com/Ascend910-memory        ignoredByScheduler: true      - name: huawei.com/Ascend910        ignoredByScheduler: true      - name: huawei.com/Ascend310P-memory        ignoredByScheduler: true      - name: huawei.com/Ascend310P        ignoredByScheduler: truekind: ConfigMapmetadata:  name: vgpu-hami-scheduler-newversion  namespace: kube-system

SchedulerName

首先是指定了 Scheduler 名字叫做 hami-scheduler,k8s 默认的调度器叫做 default-scheduler。

profiles:- schedulerName: hami-scheduler

创建 Pod 的时候我们是没有指定 schedulerName 的,所以默认都会使用 default-scheduler,也就是默认 kube-scheduler 进行调度。

之前 hami-webhook 修改 SchedulerName 时就需要和这里配置的名称对应。

Extenders

调度器核心配置如下:

extenders:- urlPrefix: "https://127.0.0.1:443"  filterVerb: filter  bindVerb: bind

参数解释:

managedResources

managedResources 这部分指定这个扩展调度器 hami-cheduler 管理的资源,只有 Pod Resource 中申请了 managedResources 中指定的资源时,Scheduler 才会请求我们配置的 Extender,也就是 hami-scheduler。

即:只要没申请 vGPU 资源,就是指定使用 hami-scheduler 调度,也是由名为 hami-scheduler 的 kube-scheduler 进行调度,不会请求 Extender,真正的 HAMi 调度插件不会生效。

managedResources:- name: nvidia.com/vgpu  ignoredByScheduler: true- name: nvidia.com/gpumem  ignoredByScheduler: true  ...

这样配置之后,对于 nvidia.com/vgpu、nvidia.com/gpumem 等等 managedResources 中指定的资源,调度器在做节点资源匹配和资源分配时,会忽略这个资源,不会因为 Node 上没有这些虚拟资源,就直接调度失败了。

当调度器请求扩展的 hami-scheduler 进行调度时,hami-scheduler 就能够正常处理这些资源,根据 Pod 申请的 Resource 配置找到对应的节点。

接下来则分析 hami-scheduler 的具体实现,包括两个问题:

3. hami 如何感知 Nod 上的 GPU 资源情况的

分为两部分:

因为 gpucore、gpumem 这些都是虚拟资源,因此不能像 DevicePlugin 上报的标准第三方资源一样,由 K8s 直接维护,而是需要 hami 自行维护。

为什么需要自定义感知逻辑

到这里大家可能会有疑问,上一篇文章中介绍了 hami-device-plugin-nvidia,这里的 devicePlugin 不就已经感知了节点上的 GPU 并上报到 kube-apiserver 了吗,怎么还需要实现一个感知逻辑?

在节点上都可以看到了,例如下面节点上就有 20 个 GPU

root@j99cloudvm:~# k describe node j99cloudvm |grep Capa -A 7Capacity:  cpu:                64  ephemeral-storage:  2097139692Ki  hugepages-1Gi:      0  hugepages-2Mi:      0  memory:             131966216Ki  nvidia.com/gpu:     20

因为:hami-scheduler 做了细粒度的 gpucore、gpumem 切分,那么就要知道节点上的 GPU 具体数量、每张卡的显存大小等信息,不然如果把 Pod 分配到一个所有 GPU 显存都已经消耗完的 Node 上不就出问题了。

感知节点上的 GPU 资源

Hami 是如何感知节点上的 GPU 情况的呢?也就是之前 start 方法中的这个 Goroutine 在维护,核心就是在这个 RegisterFromNodeAnnotations 方法中

go sher.RegisterFromNodeAnnotations()

精简后代码如下:

调用 kube-apiserver 获取到节点列表,然后从 node 的 Annoations 中解析出 Device 信息,并保存到内存中。

func (s *Scheduler) RegisterFromNodeAnnotations() {    klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")    ticker := time.NewTicker(time.Second * 15)    for {       select {       case <-s.nodeNotify:       case <-ticker.C:       case <-s.stopCh:          return       }       //  kube-apiserver 查询节点列表       labelSelector := labels.Everything()        if config.NodeLabelSelector != nil && len(config.NodeLabelSelector) > 0 {            labelSelector = (labels.Set)(config.NodeLabelSelector).AsSelector()        }        rawNodes, err := s.nodeLister.List(labelSelector)        if err != nil {            klog.Errorln("nodes list failed", err.Error())            continue        }        var nodeNames []string        for _, val := range rawNodes {            // 从 node 信息中解析出 GPU 信息            nodedevices, err := devInstance.GetNodeDevices(*val)            for _, deviceinfo := range nodedevices {                found := false                _, ok := s.nodes[val.Name]                if ok {                   for i1, val1 := range s.nodes[val.Name].Devices {                      if strings.Compare(val1.ID, deviceinfo.ID) == 0 {                         found = true                         s.nodes[val.Name].Devices[i1].Devmem = deviceinfo.Devmem                         s.nodes[val.Name].Devices[i1].Devcore = deviceinfo.Devcore                         break                      }                   }                }                if !found {                   nodeInfo.Devices = append(nodeInfo.Devices, util.DeviceInfo{                      ID:           deviceinfo.ID,                      Index:        uint(deviceinfo.Index),                      Count:        deviceinfo.Count,                      Devmem:       deviceinfo.Devmem,                      Devcore:      deviceinfo.Devcore,                      Type:         deviceinfo.Type,                      Numa:         deviceinfo.Numa,                      Health:       deviceinfo.Health,                      DeviceVendor: devhandsk,                   })                }            }        // 最终将节点信息添加到缓存里        s.addNode(val.Name, nodeInfo)        }       // .....

会将最新的 Node 数据存到内存中,便于调度时使用,就是 nodeManager 中的 nodes 这个 map 对象

type Scheduler struct {    nodeManager    podManager    stopCh     chan struct{}    kubeClient kubernetes.Interface    podLister  listerscorev1.PodLister    nodeLister listerscorev1.NodeLister    //Node status returned by filter    cachedstatus map[string]*NodeUsage    nodeNotify   chan struct{}    //Node Overview    overviewstatus map[string]*NodeUsage    eventRecorder record.EventRecorder}type nodeManager struct {    nodes map[string]*util.NodeInfo    mutex sync.RWMutex}

重点来了:这里的数据来源是 node 的 Annoations,而这个 Annoations 就是上一篇中提到的 hami device plugin nvidia 中的一个后台 goroutine 在维护。

// returns all nodes and its device memory usage, and we filter it with nodeSelector, taints, nodeAffinity// unschedulerable and nodeName.func (s *Scheduler) getNodesUsage(nodes *[]string, task *corev1.Pod) (*map[string]*NodeUsage, map[string]string, error) {    overallnodeMap := make(map[string]*NodeUsage)    cachenodeMap := make(map[string]*NodeUsage)    failedNodes := make(map[string]string)    //for _, nodeID := range *nodes {    allNodes, err := s.ListNodes()    if err != nil {       return &overallnodeMap, failedNodes, err    }    for _, node := range allNodes {       nodeInfo := &NodeUsage{}       userGPUPolicy := config.GPUSchedulerPolicy       if task != nil && task.Annotations != nil {          if value, ok := task.Annotations[policy.GPUSchedulerPolicyAnnotationKey]; ok {             userGPUPolicy = value          }       }       nodeInfo.Devices = policy.DeviceUsageList{          Policy:      userGPUPolicy,          DeviceLists: make([]*policy.DeviceListsScore, 0),       }       for _, d := range node.Devices {          nodeInfo.Devices.DeviceLists = append(nodeInfo.Devices.DeviceLists, &policy.DeviceListsScore{             Score: 0,             Device: &util.DeviceUsage{                ID:        d.ID,                Index:     d.Index,                Used:      0,                Count:     d.Count,                Usedmem:   0,                Totalmem:  d.Devmem,                Totalcore: d.Devcore,                Usedcores: 0,                Type:      d.Type,                Numa:      d.Numa,                Health:    d.Health,             },          })       }       overallnodeMap[node.ID] = nodeInfo    }    podsInfo := s.ListPodsInfo()    for _, p := range podsInfo {       node, ok := overallnodeMap[p.NodeID]       if !ok {          continue       }       for _, podsingleds := range p.Devices {          for _, ctrdevs := range podsingleds {             for _, udevice := range ctrdevs {                for _, d := range node.Devices.DeviceLists {                   if d.Device.ID == udevice.UUID {                      d.Device.Used++                      d.Device.Usedmem += udevice.Usedmem                      d.Device.Usedcores += udevice.Usedcores                   }                }             }          }       }       klog.V(5).Infof("usage: pod %v assigned %v %v", p.Name, p.NodeID, p.Devices)    }    s.overviewstatus = overallnodeMap    for _, nodeID := range *nodes {       node, err := s.GetNode(nodeID)       if err != nil {          // The identified node does not have a gpu device, so the log here has no practical meaning,increase log priority.          klog.V(5).InfoS("node unregistered", "node", nodeID, "error", err)          failedNodes[nodeID] = "node unregistered"          continue       }       cachenodeMap[node.ID] = overallnodeMap[node.ID]    }    s.cachedstatus = cachenodeMap    return &cachenodeMap, failedNodes, nil}

大概长这样:

root@test:~# k get node test -oyamlapiVersion: v1kind: Nodemetadata:  annotations:    csi.volume.kubernetes.io/nodeid: '{"nfs.csi.k8s.io":"j99cloudvm","rbd.csi.ceph.com":"j99cloudvm"}'    hami.io/node-handshake: Requesting_2024.11.19 03:10:32    hami.io/node-handshake-dcu: Deleted_2024.09.13 06:42:44    hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA      A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA      A40,0,true:'    kubeadm.alpha.kubernetes.io/cri-socket: /run/containerd/containerd.sock

其中下面这个就是 GPU 的具体信息,包括 ID、型号、内存等信息。

hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA      A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA      A40,0,true:'

感知节点上 GPU 使用情况

除此之外,hami 还需要感知到 GPU 的使用情况,才能计算出还有多少 gpucore、gpumem 可以使用

这里使用的 client-go 中提供的 Informer 机制,Watch Pod 和 Node 的变化情况,获取 Node 上运行的 Pod,根据 Pod 申请的资源和 Node 上的总资源计算出剩余资源。

// pkg/scheduler/scheduler.go#L127func (s *Scheduler) Start() {    kubeClient, err := k8sutil.NewClient()    check(err)    s.kubeClient = kubeClient    informerFactory := informers.NewSharedInformerFactoryWithOptions(s.kubeClient, time.Hour*1)    s.podLister = informerFactory.Core().V1().Pods().Lister()    s.nodeLister = informerFactory.Core().V1().Nodes().Lister()    informer := informerFactory.Core().V1().Pods().Informer()    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{       AddFunc:    s.onAddPod,       UpdateFunc: s.onUpdatePod,       DeleteFunc: s.onDelPod,    })    informerFactory.Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{       AddFunc:    s.onAddNode,       UpdateFunc: s.onUpdateNode,       DeleteFunc: s.onDelNode,    })    informerFactory.Start(s.stopCh)    informerFactory.WaitForCacheSync(s.stopCh)    s.addAllEventHandlers()}

其中 Pod Create、Update 时都会 j 进入下面这个 onAddPod 逻辑:

AssignedNodeAnnotations = "hami.io/vgpu-node"// pkg/scheduler/scheduler.go#L92func (s *Scheduler) onAddPod(obj interface{}) {    pod, ok := obj.(*corev1.Pod)    if !ok {       klog.Errorf("unknown add object type")       return    }    nodeID, ok := pod.Annotations[util.AssignedNodeAnnotations]    if !ok {       return    }    if k8sutil.IsPodInTerminatedState(pod) {       s.delPod(pod)       return    }    podDev, _ := util.DecodePodDevices(util.SupportDevices, pod.Annotations)    s.addPod(pod, nodeID, podDev)}

这边限制了只会处理 hami.io/vgpu-node annoations 的 Pod,过滤掉其他 Pod,从 Pod Annoations 中解析出该 Pod 使用的 GPU UUID 以及 memory 和 core 等信息。

Pod 上的 Annoations 大概是这样的:

$ k get po gpu-pod -oyamlapiVersion: v1kind: Podmetadata:  annotations:    hami.io/bind-phase: success    hami.io/bind-time: "1727251686"    hami.io/vgpu-devices-allocated: GPU-03f69c50-207a-2038-9b45-23cac89cb67d,NVIDIA,3000,30:;    hami.io/vgpu-devices-to-allocate: ;    hami.io/vgpu-node: test    hami.io/vgpu-time: "1727251686"

其中hami.io/vgpu-devices-allocated 对应的 value 就是 GPU 信息,格式化后如下

GPU-03f69c50-207a-2038-9b45-23cac89cb67d,NVIDIA,3000,30:;

至于 Pod 删除则是直接删除内存中缓存的 Pod 信息即可。

Node 的变化则比较简单,就是往 nodeNotify channel 发送通知

func (s *Scheduler) onUpdateNode(_, newObj interface{}) {    s.nodeNotify <- struct{}{}}func (s *Scheduler) onDelNode(obj interface{}) {    s.nodeNotify <- struct{}{}}func (s *Scheduler) onAddNode(obj interface{}) {    s.nodeNotify <- struct{}{}}

这个消息最终就会立即触发一次上面的 RegisterFromNodeAnnotations 逻辑

默认是定时的,每 15s 触发一次,增加 notify 可以在节点变化时更快感知到

func (s *Scheduler) RegisterFromNodeAnnotations() {    klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")    ticker := time.NewTicker(time.Second * 15)    for {       select {       case <-s.nodeNotify:       case <-ticker.C:       case <-s.stopCh:          return       }       // .....

通过这个 Informer HAMi 可以知道以下信息:

通过这些信息,就可以完成后续的 Scheduler 逻辑了。

4. 调度实现

上一步拿到集群中的 GPU 信息之后,就可以开始调度了,具体实现分为两个接口:

Filter 接口

看下看 Filter 接口是怎么进行节点过滤的

// pkg/scheduler/scheduler.go#L444func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {    klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)    nums := k8sutil.Resourcereqs(args.Pod)    total := 0    for _, n := range nums {       for _, k := range n {          total += int(k.Nums)       }    }    if total == 0 {       klog.V(1).Infof("pod %v not find resource", args.Pod.Name)       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))       return &extenderv1.ExtenderFilterResult{          NodeNames:   args.NodeNames,          FailedNodes: nil,          Error:       "",       }, nil    }    annos := args.Pod.Annotations    s.delPod(args.Pod)    nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)    if err != nil {       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)       return nil, err    }    if len(failedNodes) != 0 {       klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)    }    nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)    if err != nil {       err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)       return nil, err    }    if len((*nodeScores).NodeList) == 0 {       klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))       return &extenderv1.ExtenderFilterResult{          FailedNodes: failedNodes,       }, nil    }    klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))    sort.Sort(nodeScores)    m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]    klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)    annotations := make(map[string]string)    annotations[util.AssignedNodeAnnotations] = m.NodeID    annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)    for _, val := range device.GetDevices() {       val.PatchAnnotations(&annotations, m.Devices)    }    //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)    //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)    //maps.Copy(annotations, InRequestDevices)    //maps.Copy(annotations, supportDevices)    s.addPod(args.Pod, m.NodeID, m.Devices)    err = util.PatchPodAnnotations(args.Pod, annotations)    if err != nil {       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)       s.delPod(args.Pod)       return nil, err    }    s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)    res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}    return &res, nil}

对于没有申请特殊资源的 Pod 直接返回全部 Node 都可以调度,不做处理

nums := k8sutil.Resourcereqs(args.Pod)total := 0for _, n := range nums {    for _, k := range n {       total += int(k.Nums)    }}if total == 0 {    klog.V(1).Infof("pod %v not find resource", args.Pod.Name)    s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))    return &extenderv1.ExtenderFilterResult{       NodeNames:   args.NodeNames,       FailedNodes: nil,       Error:       "",    }, nil}

否则就根据上一步中获取到的 Node 信息进行打分,具体打分逻辑则是根据每个节点上的已经使用的 GPU Core、GPU Memory 资源和总的 GPU Core、GPU Memory 的比值,根据权重归一化处理后得到最终的得分。

总的来说就是:节点上 GPU Core 和 GPU Memory 资源剩余越少,得分越高

// pkg/scheduler/policy/node_policy.go#L52func (ns *NodeScore) ComputeScore(devices DeviceUsageList) {    // current user having request resource    used, usedCore, usedMem := int32(0), int32(0), int32(0)    for _, device := range devices.DeviceLists {       used += device.Device.Used       usedCore += device.Device.Usedcores       usedMem += device.Device.Usedmem    }    klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem)    total, totalCore, totalMem := int32(0), int32(0), int32(0)    for _, deviceLists := range devices.DeviceLists {       total += deviceLists.Device.Count       totalCore += deviceLists.Device.Totalcore       totalMem += deviceLists.Device.Totalmem    }    useScore := float32(used) / float32(total)    coreScore := float32(usedCore) / float32(totalCore)    memScore := float32(usedMem) / float32(totalMem)    ns.Score = float32(Weight) * (useScore + coreScore + memScore)    klog.V(2).Infof("node %s computer score is %f", ns.NodeID, ns.Score)}

除了计算得分还要判断节点剩余资源是否能满足当前 Pod,如果不满足则直接忽略掉

// pkg/scheduler/score.go#L185func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {    userNodePolicy := config.NodeSchedulerPolicy    if annos != nil {       if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {          userNodePolicy = value       }    }    res := policy.NodeScoreList{       Policy:   userNodePolicy,       NodeList: make([]*policy.NodeScore, 0),    }    //func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {    // res := make(NodeScoreList, 0, len(*nodes))    for nodeID, node := range *nodes {       viewStatus(*node)       score := policy.NodeScore{NodeID: nodeID, Devices: make(util.PodDevices), Score: 0}       score.ComputeScore(node.Devices)       //This loop is for different container request       ctrfit := false       for ctrid, n := range nums {          sums := 0          for _, k := range n {             sums += int(k.Nums)          }          if sums == 0 {             for idx := range score.Devices {                if len(score.Devices[idx]) <= ctrid {                   score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})                }                score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})                continue             }          }          klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)          fit, _ := fitInDevices(node, n, annos, task, &score.Devices)          ctrfit = fit          if !fit {             klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)             break          }       }       if ctrfit {          res.NodeList = append(res.NodeList, &score)       }    }    return &res, nil}

计算完成之后从中选了一个节点进行调度。

//计算得分,拿到所有满足条件的节点nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)// 排序sort.Sort(nodeScores)// 直接选择最后一个节点m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]// 返回结果res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}return &res, nil

到这里,我们已经拿到了最终要调度的 Node 了,调度逻辑就结束了。

这里大家可能会有疑问:为什么 Filter 方法就只返回了一个节点,甚至还融合了打分的逻辑在里面。

这个问题在上一篇K8s 自定义调度器 Part1:通过 Scheduler Extender 实现自定义调度逻辑 中已经有解释了,如果按照正常逻辑实现 Filter、Score 等方法,最终 Scheduler 会汇总多个插件的打分,然后根据最终结果选择一个节点,但是 HAMi 这边是要完全控制调度结果的,因此直接将 Filter、Score 逻辑融合到一起,最终就只返回一个目标节点,这样最后肯定会调度到该节点。

Bind 接口

很简单,直接根据 Filter 的返回结果,将 Pod 和 Node 绑定即可完成调度。

func (s *Scheduler) Bind(args extenderv1.ExtenderBindingArgs) (*extenderv1.ExtenderBindingResult, error) {    klog.InfoS("Bind", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)    var err error    var res *extenderv1.ExtenderBindingResult    binding := &corev1.Binding{       ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},       Target:     corev1.ObjectReference{Kind: "Node", Name: args.Node},    }    current, err := s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(), args.PodName, metav1.GetOptions{})    if err != nil {       klog.ErrorS(err, "Get pod failed")    }    node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{})    if err != nil {       klog.ErrorS(err, "Failed to get node", "node", args.Node)       s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %v", args.Node))       res = &extenderv1.ExtenderBindingResult{          Error: err.Error(),       }       return res, nil    }    tmppatch := make(map[string]string)    for _, val := range device.GetDevices() {       err = val.LockNode(node, current)       if err != nil {          goto ReleaseNodeLocks       }    }    /*       err = nodelock.LockNode(args.Node)       if err != nil {          klog.ErrorS(err, "Failed to lock node", "node", args.Node)          res = &extenderv1.ExtenderBindingResult{             Error: err.Error(),          }          return res, nil       }*/    //defer util.ReleaseNodeLock(args.Node)    tmppatch[util.DeviceBindPhase] = "allocating"    tmppatch[util.BindTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)    err = util.PatchPodAnnotations(current, tmppatch)    if err != nil {       klog.ErrorS(err, "patch pod annotation failed")    }    if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil {       klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)    }    if err == nil {       s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil)       res = &extenderv1.ExtenderBindingResult{          Error: "",       }       klog.Infoln("After Binding Process")       return res, nil    }ReleaseNodeLocks:    klog.InfoS("bind failed", "err", err.Error())    for _, val := range device.GetDevices() {       val.ReleaseNodeLock(node, current)    }    s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, err)    return &extenderv1.ExtenderBindingResult{       Error: err.Error(),    }, nil}

核心部分:

binding := &corev1.Binding{   ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},   Target:     corev1.ObjectReference{Kind: "Node", Name: args.Node},}if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil {   klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)}

调用 api 创建了一个 binding 对象,将 Pod 调度到指定节点即可。

至此,Scheduler 的逻辑我们就分析完了,调度完成 Kubelet 就开始启动 Pod 了,然后 hami 的 device plugin 也要开始发挥作用了。

小结

这里 HAMi 是使用默认的 kube-scheduler 镜像额外启动了一个 scheduler,但是通过配置把名称指定为了 hami-scheduler。

然后给这个 hami-scheduler 配置了 Extender,Extender 服务就是同 Pod 中的另一个 Container 启动的一个 http 服务。

ps:我们说的 hami-scheduler 一般只这部分 Extender 实现的调度插件

然后在调度可以分为两部分:


**【Kubernetes 系列】**持续更新中,搜索公众号【探索云原生】订阅,阅读更多文章。


5. 小结

本文主要分析了 hami-scheduler 的实现原理,其中包含两个组件:

HAMi Webhook 、Scheduler 工作流程如下:

至此,HAMi Webhook、Scheduler 就分析完了,spread & binpark 等 高级调度策略是如何实现的留着下篇分析~。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

HAMi kube-scheduler vGPU 调度器 Kubernetes
相关文章