本文主要介绍k8s.io/kubernetes/pkg/scheduler/framework的调度流程。
调度框架为kube-scheduler提供一组插件式api,这些插件编译到kube-scheduler中,也可以以插件的姿势实现更多的调度特性,这样可以保证核心组件简单、易维护。
Scheduling Cycle& Binding Cycle
每一个需要调度的pod都会经历两个流程Scheduling Cycle和Binding Cycle. Scheduling Cycle给pod挑选一个node,Binding Cycle则将该决策应用到集群中,Scheduling Cycle可能是顺序执行,Binding Cycles阶段可能是并发运行。
如果确定一个pod无法调度或者出现内部错误,Scheduling Cycle和Binding Cycle将会终止,pod将会重新回到队列等待重试;如果在Binding Cycle被终止,将会调用Reserve插件Unreserve方法。
扩展点
如图展示一个pod的调度流程和调度框架公开的扩展点;Filter也就是谓词过滤器(predicate filter),Scoring等同于优先算法(Priority function),注册的插件将会在对应的扩展点被调用。
1. queue sort
queue sort插件用于排序在调度队列中的pod,该插件只能启动一个。
pkg/scheduler/framework/v1alpha1/interface.go
type QueueSortPlugin interface {
Plugin
Less(*QueuedPodInfo, *QueuedPodInfo) bool
}
默认实现了一个优先调度插件即获取pod中的spec.Priority字段两个pod进行比较,每一个进入队列的pod都需要经过该方法对其进行排序,Priority字段最大的将会在队列的最前面。
pkg/scheduler/framework/plugins/queuesort/priority_sort.go
1.func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
2. // 获取pod的 spec.Priority 字段
3. p1 :=pod.GetPodPriority(pInfo1.Pod)
4. p2 :=pod.GetPodPriority(pInfo2.Pod)
5. return (p1 > p2) || (p1==p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
6.}
2. PreFilter
这些插件用于预处理pod的信息,并去检测集群或者pod需要达成的某种特定条件。
preFilter插件必须实现 PreFilter方法,并按照顺序调用每一个插件,如果其中一个插件preFilter返回error整个Scheduler Cycle将会中止pod被送入不可调度队列。
1.type PreFilterPlugin interface {
2. Plugin
3. PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status
4. PreFilterExtensions PreFilterExtensions
5.}
看一个默认实现的PreFilter的插件NodePort plugin:
pkg/scheduler/framework/plugins/nodeports/node_ports.go
func getContainerPorts(pods ...*v1.Pod) *v1.ContainerPort {
ports :=*v1.ContainerPort{}
for _, pod :=range pods {
for j :=range pod.Spec.Containers {
container :=&pod.Spec.Containers[j]
for k :=range container.Ports {
ports=append(ports, &container.Ports[k])
}
}
}
return ports
}
// PreFilter invoked at the prefilter extension point.
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
s :=getContainerPorts(pod)
cycleState.Write(preFilterStateKey, preFilterState(s))
return nil
}
遍历pod的spec.containers.ports字段搜集所有需要开放的port,并写到一个Map中。
另外Pre-filter插件还可以实现一个可选项 PreFilterExtensions接口,该接口提供两个方法AddPod和RemovePod去修改预处理信息。调度框架保证这些函数在preFilter后被调用,而且有可能被调用多次。
3. Filter
该步骤会过滤掉那些不能运行的pod的node,每一个node都会按照配置的顺序调用每一个插件,如果任何一个插件标记该节点不合适,剩下的插件将不会被调用,该过程可能是并行的,某些时候可能会调用多次。
1.type FilterPlugin interface {
2. Plugin
3. Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
4.}
同样看Node ports插件的Filter方法,这里会查看对应的node上是否有pod所需的端口,若端口冲突直接过滤掉这个node。
1.func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
2. // 获取pre-filter 获取的字段
3. wantPorts, err :=getPreFilterState(cycleState)
4. if err !=nil {
5. return framework.NewStatus(framework.Error, err.Error)
6. }
7. // 判断node 的port 和pod 的port 是否有冲突
8. fits :=fitsPorts(wantPorts, nodeInfo)
9. if !fits {
10. return framework.NewStatus(framework.Unschedulable, ErrReason)
11. }
12.
13. return nil
14.}
15.
16http://www.yidianzixun.com/article//水泥砂浆一立方多少吨 Fits checks if the pod fits the node.
17.func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
18. return fitsPorts(getContainerPorts(pod), nodeInfo)
19.}
20.
21.func fitsPorts(wantPorts *v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
22. // try to see whether existingPorts and wantPorts will conflict or not
23. existingPorts :=nodeInfo.UsedPorts
24. for _, cp :=range wantPorts {
25. if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
26. return false
27. }
28. }
29. return true
30.}
4. PostFilter
这些插件将会在Filter后且发现无Node可选时被调用,一个典型的例子时抢占式,如果任何一个插件标记某个节点可被调度,那么剩下的插件将不会被调用。
pkg/scheduler/core/generic_scheduler.go
5. PreScore
这里也是采集一些需要打分的指标,采集后放到一个Map中用于后续Score阶段评分,当任意一个插件运行失败,剩下的插件同样无法继续执行。
pkg/scheduler/framework/v1alpha1/interface.go
1.type PreScorePlugin interface {
2. Plugin
3. PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes *v1.Node) *Status
4.}
例如ResourceLimits plugin对每个容器的Limit资源进行相加,并获取到Init容器所需要的最大资源。
pkg/scheduler/framework/plugins/noderesources/resource_limits.go
1.func (rl *ResourceLimits) PreScore(
2. pCtx context.Context,
3. cycleState *framework.CycleState,
4. pod *v1.Pod,
5. nodes *v1.Node,
6.) *framework.Status {
7. if len(nodes)==0 {
8. // No nodes to score.
9. return nil
10. }
11.
12. if rl.handle.SnapshotSharedLister==nil {
13. return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister"))
14. }
15. s :=&preScoreState{
16. // 将每个Container 需要的资源相加 并获取InitContainer所需最大的资源
17. podResourceRequest: getResourceLimits(pod),
18. }
19. cycleState.Write(preScoreStateKey, s)
20. return nil
21.}
6. Score
这个阶段将会调用所有实现了Score的plugin对通过过滤器的所有Node进行打分,NormalizeScore阶段后将会将每个插件返回分值按照他们配置的比重进行合并。
如下是ResourceList plugin的Score阶段代码:
1.如果node没有发布可分配的资源,对节点评分为0。
2.如果pod未指定CPU或内存限制,对节点评分为0。
3.如果CPU和内存都符合,则这个节点分数为1。
1.func (rl *ResourceLimits) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
2. nodeInfo, err :=rl.handle.SnapshotSharedLister.NodeInfos.Get(nodeName)
3. if err !=nil || nodeInfo.Node==nil {
4. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node==nil))
5. }
6.
7. podLimits, err :=getPodResource(state)
8. if err !=nil {
9. return 0, framework.NewStatus(framework.Error, err.Error)
10. }
11.
12. cpuScore :=computeScore(podLimits.MilliCPU, nodeInfo.Allocatable.MilliCPU)
13. memScore :=computeScore(podLimits.Memory, nodeInfo.Allocatable.Memory)
14.
15. score :=int64(0)
16. if cpuScore==1 || memScore==1 {
17. score=1
18. }
19. return score, nil
20.}
7. NormalizeScore
在Scheduler给Node最终得分之前修改分数,该步骤主要用于修正分数到合理数值(0~100),例如Node affinity的Score阶段,会将配置的wight全部相加,该字段为用户自定义配置,所以这里就需要对其进行修正。
pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go
1.func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
2. nodeInfo, err :=pl.handle.SnapshotSharedLister.NodeInfos.Get(nodeName)
3. if err !=nil {
4. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
5. }
6.
7. node :=nodeInfo.Node
8. if node==nil {
9. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
10. }
11.
12. affinity :=pod.Spec.Affinity
13.
14. var count int64
15. // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
16. // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
17. // empty PreferredSchedulingTerm matches all objects.
18. if affinity !=nil && affinity.NodeAffinity !=nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution !=nil {
19. // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
20. for i :=range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
21. preferredSchedulingTerm :=&affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
22. if preferredSchedulingTerm.Weight==0 {
23. continue
24. }
25.
26. // TODO: Avoid computing it for all nodes if this becomes a performance problem.
27. nodeSelector, err :=v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
28. if err !=nil {
29. return 0, framework.NewStatus(framework.Error, err.Error)
30. }
31.
32. if nodeSelector.Matches(labels.Set(node.Labels)) {
33. count +=int64(preferredSchedulingTerm.Weight)
34. }
35. }
36. }
37.
38. return count, nil
39.}
40.
41http://www.yidianzixun.com/article// NormalizeScore invoked after scoring all nodes.
42.func (pl *NodeAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
43. return pluginhelper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)
44.}
DefaultNormalizeScore默认的NormalizeScore函数,作用是让评分处于[0, maxPriority ] 之间,可以对分数进行反转(当前分数=最大分数 - 当前分数)。
pkg/scheduler/framework/plugins/helper/normalize_score.go
1.func DefaultNormalizeScore(maxPriority int64, reverse bool, scores framework.NodeScoreList) *framework.Status {
2. var maxCount int64
3. for i :=range scores {
4. if scores[i].Score > maxCount {
5. maxCount=scores[i].Score
6. }
7. }
8.
9. if maxCount==0 {
10. if reverse {
11. for i :=range scores {
12. scores[i].Score=maxPriority
13. }
14. }
15. return nil
16. }
17.
18. for i :=range scores {
19. score :=scores[i].Score
20.
21. score=maxPriority * score / maxCount
22. if reverse {
23. score=maxPriority - score
24. }
25.
26. scores[i].Score=score
27. }
28. return nil
29.}
7. Reserve
要扩展Reserve需要实现两个方法Reserve and Unreserve。
这两个信息调度阶段分别称为Reserve和Unreserve,维护有状态插件时当一个节点上的资源被保留(Reserve) 和未被给定pod保留(Unreserve),应该使用该阶段去通知调度程序,Reserve发生在Scheduler真正绑定pod到node之前。
每个Reserve插件的Reserve方法可能成功也可能失败,如果一个Reserve方法调用失败,后续的插件就不会执行,并且认为Reserve阶段已经失败。
如果所有插件的Reserve方法成功,则认为Reserve阶段成功,并执行Scheduling Cycle 和Binding Cycle的其余部分,当Reserve阶段或后续阶段失败时,会触发Unreserve,当这种情况发生时,所有的Reserve插件的Unreserve方法将以与Reserve方法调用相反的顺序执行,此阶段的存在是为了清除与保留Pod相关联的状态。
VolumeBinding plugin Reserve:
pkg/scheduler/framework/plugins/volumebinding/volume_binding.go
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// AssumePodVolumes将:
// 1. 取未绑定PVC的PV匹配项并更新PV缓存,假设PV已预先绑定到PVC。
// 2. 获取需要配置的PVC,并使用相关的注释集更新PVC缓存。
//如果所有卷都已绑定,则返回true
allBound, err :=pl.Binder.AssumePodVolumes(pod, nodeName)
if err !=nil {
return framework.NewStatus(framework.Error, err.Error)
}
cs.Write(allBoundStateKey, stateData{allBound: allBound})
return nil
}
Unreserve:
func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
pl.Binder.DeletePodBindings(pod)
return
}
8. permit
该阶段是Scheduling cycle的最后一个阶段,主要用于阻止或者延期pod绑定到node上。
type PermitPlugin interface {
Plugin
Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}
这些插件做三件事:
Approve:一旦所以插件都准许,那么pod将会到binding Cycle。
deny:如果任何一个插件拒绝,该pod将会重新回到调度队列,并触发前面的Unreserve操作。
wait(with a timeout):如果一个插件返回wait,那么该pod就会进入一个等待队列中,pod开始进入到binding Cycle但是是阻塞的知道允许;如果timeout发生,那么将会从timeout状态变成deny状态。
9. PreBind
执行pod的绑定前操作。
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
state, err :=cs.Read(allBoundStateKey)
if err !=nil {
}
s, ok :=state.(stateData)
if !ok {
return framework.NewStatus(framework.Error, "unable to convert state into stateData")
}
if s.allBound {
// no need to bind volumes
return nil
}
klog.V(5).Infof("Trying to bind volumes for pod "%v/%v"", pod.Namespace, pod.Name)
`// 绑定pod volume`
err=pl.Binder.BindPodVolumes(pod)
if err !=nil {
klog.V(1).Infof("Failed to bind volumes for pod "%v/%v": %v", pod.Namespace, pod.Name, err)
}
klog.V(5).Infof("Success binding volumes for pod "%v/%v"", pod.Namespace, pod.Name)
return nil
}
10. bind
将pod绑定到Node, 只有所以preBind操作完成后,才会执行该步骤,每个bind plugin将会配置的顺序执行,bind plugin可以选择性的对pod进行操作,如果其中一个插件处理过了一个pod,那么剩下的将会被跳过。
pkg/scheduler/framework/plugins/defaultbinder/default_binder.go
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
binding :=&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
Target: v1.ObjectReference{Kind: "Node", Name: nodeName},
}
err :=b.handle.ClientSet.CoreV1.Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err !=nil {
}
return nil
}
11. postBind
这是一个信息扩展点,绑定后插件在Pod成功绑定后被调用,这是一个绑定循环的结束,可以用来清理相关的资源。
func (pl *VolumeBinding) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
return
}
四川保温砂浆
这是四川改性挤塑板成都挤塑板,四川保温砂浆(2021-06-01 18:09:44)
评论(0)