水泥砂浆一立方多少吨

四川建材 2021-06-01 阅读:234

本文主要介绍k8s.io/kubernetes/pkg/scheduler/framework的调度流程。

调度框架为kube-scheduler提供一组插件式api,这些插件编译到kube-scheduler中,也可以以插件的姿势实现更多的调度特性,这样可以保证核心组件简单、易维护。

Scheduling Cycle& Binding Cycle

每一个需要调度的pod都会经历两个流程Scheduling Cycle和Binding Cycle. Scheduling Cycle给pod挑选一个node,Binding Cycle则将该决策应用到集群中,Scheduling Cycle可能是顺序执行,Binding Cycles阶段可能是并发运行。

如果确定一个pod无法调度或者出现内部错误,Scheduling Cycle和Binding Cycle将会终止,pod将会重新回到队列等待重试;如果在Binding Cycle被终止,将会调用Reserve插件Unreserve方法。

扩展点

如图展示一个pod的调度流程和调度框架公开的扩展点;Filter也就是谓词过滤器(predicate filter),Scoring等同于优先算法(Priority function),注册的插件将会在对应的扩展点被调用。

1. queue sort

queue sort插件用于排序在调度队列中的pod,该插件只能启动一个。

pkg/scheduler/framework/v1alpha1/interface.go

type QueueSortPlugin interface {

Plugin

Less(*QueuedPodInfo, *QueuedPodInfo) bool

}

默认实现了一个优先调度插件即获取pod中的spec.Priority字段两个pod进行比较,每一个进入队列的pod都需要经过该方法对其进行排序,Priority字段最大的将会在队列的最前面。

pkg/scheduler/framework/plugins/queuesort/priority_sort.go

1.func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {

2. // 获取pod的 spec.Priority 字段

3. p1 :=pod.GetPodPriority(pInfo1.Pod)

4. p2 :=pod.GetPodPriority(pInfo2.Pod)

5. return (p1 > p2) || (p1==p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))

6.}

2. PreFilter

这些插件用于预处理pod的信息,并去检测集群或者pod需要达成的某种特定条件。

preFilter插件必须实现 PreFilter方法,并按照顺序调用每一个插件,如果其中一个插件preFilter返回error整个Scheduler Cycle将会中止pod被送入不可调度队列。

1.type PreFilterPlugin interface {

2. Plugin

3. PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status

4. PreFilterExtensions PreFilterExtensions

5.}

看一个默认实现的PreFilter的插件NodePort plugin:

pkg/scheduler/framework/plugins/nodeports/node_ports.go

func getContainerPorts(pods ...*v1.Pod) *v1.ContainerPort {

ports :=*v1.ContainerPort{}

for _, pod :=range pods {

for j :=range pod.Spec.Containers {

container :=&pod.Spec.Containers[j]

for k :=range container.Ports {

ports=append(ports, &container.Ports[k])

}

}

}

return ports

}

// PreFilter invoked at the prefilter extension point.

func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {

s :=getContainerPorts(pod)

cycleState.Write(preFilterStateKey, preFilterState(s))

return nil

}

遍历pod的spec.containers.ports字段搜集所有需要开放的port,并写到一个Map中。

另外Pre-filter插件还可以实现一个可选项 PreFilterExtensions接口,该接口提供两个方法AddPod和RemovePod去修改预处理信息。调度框架保证这些函数在preFilter后被调用,而且有可能被调用多次。

3. Filter

该步骤会过滤掉那些不能运行的pod的node,每一个node都会按照配置的顺序调用每一个插件,如果任何一个插件标记该节点不合适,剩下的插件将不会被调用,该过程可能是并行的,某些时候可能会调用多次。

1.type FilterPlugin interface {

2. Plugin

3. Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status

4.}

同样看Node ports插件的Filter方法,这里会查看对应的node上是否有pod所需的端口,若端口冲突直接过滤掉这个node。

1.func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {

2. // 获取pre-filter 获取的字段

3. wantPorts, err :=getPreFilterState(cycleState)

4. if err !=nil {

5. return framework.NewStatus(framework.Error, err.Error)

6. }

7. // 判断node 的port 和pod 的port 是否有冲突

8. fits :=fitsPorts(wantPorts, nodeInfo)

9. if !fits {

10. return framework.NewStatus(framework.Unschedulable, ErrReason)

11. }

12.

13. return nil

14.}

15.

16http://www.yidianzixun.com/article//水泥砂浆一立方多少吨 Fits checks if the pod fits the node.

17.func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {

18. return fitsPorts(getContainerPorts(pod), nodeInfo)

19.}

20.

21.func fitsPorts(wantPorts *v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {

22. // try to see whether existingPorts and wantPorts will conflict or not

23. existingPorts :=nodeInfo.UsedPorts

24. for _, cp :=range wantPorts {

25. if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {

26. return false

27. }

28. }

29. return true

30.}

4. PostFilter

这些插件将会在Filter后且发现无Node可选时被调用,一个典型的例子时抢占式,如果任何一个插件标记某个节点可被调度,那么剩下的插件将不会被调用。

pkg/scheduler/core/generic_scheduler.go

5. PreScore

这里也是采集一些需要打分的指标,采集后放到一个Map中用于后续Score阶段评分,当任意一个插件运行失败,剩下的插件同样无法继续执行。

pkg/scheduler/framework/v1alpha1/interface.go

1.type PreScorePlugin interface {

2. Plugin

3. PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes *v1.Node) *Status

4.}

例如ResourceLimits plugin对每个容器的Limit资源进行相加,并获取到Init容器所需要的最大资源。

pkg/scheduler/framework/plugins/noderesources/resource_limits.go

1.func (rl *ResourceLimits) PreScore(

2. pCtx context.Context,

3. cycleState *framework.CycleState,

4. pod *v1.Pod,

5. nodes *v1.Node,

6.) *framework.Status {

7. if len(nodes)==0 {

8. // No nodes to score.

9. return nil

10. }

11.

12. if rl.handle.SnapshotSharedLister==nil {

13. return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister"))

14. }

15. s :=&preScoreState{

16. // 将每个Container 需要的资源相加 并获取InitContainer所需最大的资源

17. podResourceRequest: getResourceLimits(pod),

18. }

19. cycleState.Write(preScoreStateKey, s)

20. return nil

21.}

6. Score

这个阶段将会调用所有实现了Score的plugin对通过过滤器的所有Node进行打分,NormalizeScore阶段后将会将每个插件返回分值按照他们配置的比重进行合并。

如下是ResourceList plugin的Score阶段代码:

1.如果node没有发布可分配的资源,对节点评分为0。

2.如果pod未指定CPU或内存限制,对节点评分为0。

3.如果CPU和内存都符合,则这个节点分数为1。

1.func (rl *ResourceLimits) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {

2. nodeInfo, err :=rl.handle.SnapshotSharedLister.NodeInfos.Get(nodeName)

3. if err !=nil || nodeInfo.Node==nil {

4. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node==nil))

5. }

6.

7. podLimits, err :=getPodResource(state)

8. if err !=nil {

9. return 0, framework.NewStatus(framework.Error, err.Error)

10. }

11.

12. cpuScore :=computeScore(podLimits.MilliCPU, nodeInfo.Allocatable.MilliCPU)

13. memScore :=computeScore(podLimits.Memory, nodeInfo.Allocatable.Memory)

14.

15. score :=int64(0)

16. if cpuScore==1 || memScore==1 {

17. score=1

18. }

19. return score, nil

20.}

7. NormalizeScore

在Scheduler给Node最终得分之前修改分数,该步骤主要用于修正分数到合理数值(0~100),例如Node affinity的Score阶段,会将配置的wight全部相加,该字段为用户自定义配置,所以这里就需要对其进行修正。

pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go

1.func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {

2. nodeInfo, err :=pl.handle.SnapshotSharedLister.NodeInfos.Get(nodeName)

3. if err !=nil {

4. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))

5. }

6.

7. node :=nodeInfo.Node

8. if node==nil {

9. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))

10. }

11.

12. affinity :=pod.Spec.Affinity

13.

14. var count int64

15. // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.

16. // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an

17. // empty PreferredSchedulingTerm matches all objects.

18. if affinity !=nil && affinity.NodeAffinity !=nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution !=nil {

19. // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.

20. for i :=range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {

21. preferredSchedulingTerm :=&affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]

22. if preferredSchedulingTerm.Weight==0 {

23. continue

24. }

25.

26. // TODO: Avoid computing it for all nodes if this becomes a performance problem.

27. nodeSelector, err :=v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)

28. if err !=nil {

29. return 0, framework.NewStatus(framework.Error, err.Error)

30. }

31.

32. if nodeSelector.Matches(labels.Set(node.Labels)) {

33. count +=int64(preferredSchedulingTerm.Weight)

34. }

35. }

36. }

37.

38. return count, nil

39.}

40.

41http://www.yidianzixun.com/article// NormalizeScore invoked after scoring all nodes.

42.func (pl *NodeAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {

43. return pluginhelper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)

44.}

DefaultNormalizeScore默认的NormalizeScore函数,作用是让评分处于[0, maxPriority ] 之间,可以对分数进行反转(当前分数=最大分数 - 当前分数)。

pkg/scheduler/framework/plugins/helper/normalize_score.go

1.func DefaultNormalizeScore(maxPriority int64, reverse bool, scores framework.NodeScoreList) *framework.Status {

2. var maxCount int64

3. for i :=range scores {

4. if scores[i].Score > maxCount {

5. maxCount=scores[i].Score

6. }

7. }

8.

9. if maxCount==0 {

10. if reverse {

11. for i :=range scores {

12. scores[i].Score=maxPriority

13. }

14. }

15. return nil

16. }

17.

18. for i :=range scores {

19. score :=scores[i].Score

20.

21. score=maxPriority * score / maxCount

22. if reverse {

23. score=maxPriority - score

24. }

25.

26. scores[i].Score=score

27. }

28. return nil

29.}

7. Reserve

要扩展Reserve需要实现两个方法Reserve and Unreserve。

这两个信息调度阶段分别称为Reserve和Unreserve,维护有状态插件时当一个节点上的资源被保留(Reserve) 和未被给定pod保留(Unreserve),应该使用该阶段去通知调度程序,Reserve发生在Scheduler真正绑定pod到node之前。

每个Reserve插件的Reserve方法可能成功也可能失败,如果一个Reserve方法调用失败,后续的插件就不会执行,并且认为Reserve阶段已经失败。

如果所有插件的Reserve方法成功,则认为Reserve阶段成功,并执行Scheduling Cycle 和Binding Cycle的其余部分,当Reserve阶段或后续阶段失败时,会触发Unreserve,当这种情况发生时,所有的Reserve插件的Unreserve方法将以与Reserve方法调用相反的顺序执行,此阶段的存在是为了清除与保留Pod相关联的状态。

VolumeBinding plugin Reserve:

pkg/scheduler/framework/plugins/volumebinding/volume_binding.go

func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {

// AssumePodVolumes将:

// 1. 取未绑定PVC的PV匹配项并更新PV缓存,假设PV已预先绑定到PVC。

// 2. 获取需要配置的PVC,并使用相关的注释集更新PVC缓存。

//如果所有卷都已绑定,则返回true

allBound, err :=pl.Binder.AssumePodVolumes(pod, nodeName)

if err !=nil {

return framework.NewStatus(framework.Error, err.Error)

}

cs.Write(allBoundStateKey, stateData{allBound: allBound})

return nil

}

Unreserve:

func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {

pl.Binder.DeletePodBindings(pod)

return

}

8. permit

该阶段是Scheduling cycle的最后一个阶段,主要用于阻止或者延期pod绑定到node上。

type PermitPlugin interface {

Plugin

Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)

}

这些插件做三件事:

Approve:一旦所以插件都准许,那么pod将会到binding Cycle。

deny:如果任何一个插件拒绝,该pod将会重新回到调度队列,并触发前面的Unreserve操作。

wait(with a timeout):如果一个插件返回wait,那么该pod就会进入一个等待队列中,pod开始进入到binding Cycle但是是阻塞的知道允许;如果timeout发生,那么将会从timeout状态变成deny状态。

9. PreBind

执行pod的绑定前操作。

func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {

state, err :=cs.Read(allBoundStateKey)

if err !=nil {

}

s, ok :=state.(stateData)

if !ok {

return framework.NewStatus(framework.Error, "unable to convert state into stateData")

}

if s.allBound {

// no need to bind volumes

return nil

}

klog.V(5).Infof("Trying to bind volumes for pod "%v/%v"", pod.Namespace, pod.Name)

`// 绑定pod volume`

err=pl.Binder.BindPodVolumes(pod)

if err !=nil {

klog.V(1).Infof("Failed to bind volumes for pod "%v/%v": %v", pod.Namespace, pod.Name, err)

}

klog.V(5).Infof("Success binding volumes for pod "%v/%v"", pod.Namespace, pod.Name)

return nil

}

10. bind

将pod绑定到Node, 只有所以preBind操作完成后,才会执行该步骤,每个bind plugin将会配置的顺序执行,bind plugin可以选择性的对pod进行操作,如果其中一个插件处理过了一个pod,那么剩下的将会被跳过。

pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {

klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)

binding :=&v1.Binding{

ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},

Target: v1.ObjectReference{Kind: "Node", Name: nodeName},

}

err :=b.handle.ClientSet.CoreV1.Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})

if err !=nil {

}

return nil

}

11. postBind

这是一个信息扩展点,绑定后插件在Pod成功绑定后被调用,这是一个绑定循环的结束,可以用来清理相关的资源。

func (pl *VolumeBinding) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {

return

}


四川保温砂浆


这是四川改性挤塑板成都挤塑板,四川保温砂浆(2021-06-01 18:09:44)

评论(0)