上一篇讲了待调度任务的组织形式,这一篇来继续挑软骨头啃:节点资源抽象和调度策略。
引子 由于 Ray 支持对任务进行显式的资源约束 ,因此需要对所有节点的资源进行硬件无关的抽象,将所有资源归一化管理,以在逻辑层面对资源进行增删。当有节点加入,需要感知其资源总量大小;当有任务调度,需要寻找满足约束节点;当任务调度成功,可以获取剩余可用资源等等。
Ray 除了对标准资源如 CPU,GPU 的支持,还支持对用户自定义 label 的资源的调度。用户在启动节点(ray start --resources <resources>
)指定该节点具有某种类别的资源 (比如说 memory,bandwidth,某种型号的 GPU 等等)的总量,在定义 remote 函数时指定任务使用多少该类别的资源,Ray 的调度器在调度该任务时,就会按照用户自定义的资源需求将其调度到特定的机器上去。这是一种用户代码和调度器交互的一种有趣设计 。
对于调度策略,由于 Ray 是去中心化的调度,很容易存在不一致状态。最简单的在实践中反而是统计最优的——对于每个任务找到符合资源约束的节点,随机选择一个,将任务调度过去。
作者:木鸟杂记 https://www.qtmuniao.com , 转载请注明出处
调度资源抽象(SchedulingResources) 最基本的四个类是 FractionalResourceQuantity
、 ResourceSet
、 ResourceIds
和 ResourceIdSet
。各个类的特点概述一下:
FractionalResourceQuantity
定义了某种资源的量值
ResourceSet
是一组不同种类资源及其量值的集合
ResourceIds
对资源量按分数进行了标号——0, 1 … quantity-1 。
ResourceIdSet
是一组标号后的资源的集合。
前两者是在多少 层面上对资源进行描述,后两者是在索引 层面对资源进行解构。
后两者是在前两者基础上的细化 。他们都定义了单个量值 和集合不同种类量值构成的集合量 。
此外,很重要的一点是,在 FractionalResourceQuantity
名字中也有体现,Ray 支持小数量值 ,但是只支持纯小数量值。为什么会有这种设计呢?举个最简单的例子,GPU 很贵嘛,于是就想多个 Task 共用一个 GPU,以提高 GPU 的利用率。那么每个 Task 在定义 GPU 需求的时候,就可以写需要零点几个 GPU。在这种场景下,一点几个和二点几个的非纯小数值就没什么意思了,毕竟要么独占一个,要么与他人共享一个。
资源量值(FractionalResourceQuantity) FractionalResourceQuantity
是对 double 的包装,表示 Ray 中对资源度量的量 。但为了计算不损失精度,其内部实际实现为 64bit 的整型——实际值 乘以 kResourceConversionFactor = 10000
取整。其目的很明显:
对于 Ray 的资源使用场景下,四五位小数左右的精度就够了
在这个精度内提供精确的运算
在此基础上重载了可度量的量的一些基本操作——加减运算和布尔运算。在 Ray 的场景下,只有节点加入(增加资源),判断是否可调度(比较资源)、调度任务(减小资源)等操作,因此乘除操作是不需要的。
当然也可以从另外一个角度来理解,或许更好理解一点,其内部表示将 量纲/单位 从逻辑的 1
,缩小为了 1/kResourceConversionFactor
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class FractionalResourceQuantity { public : FractionalResourceQuantity (); FractionalResourceQuantity (double resource_quantity); const FractionalResourceQuantity operator +(const FractionalResourceQuantity &rhs) const ; const FractionalResourceQuantity operator -(const FractionalResourceQuantity &rhs) const ; void operator +=(const FractionalResourceQuantity &rhs); void operator -=(const FractionalResourceQuantity &rhs); bool operator ==(const FractionalResourceQuantity &rhs) const ; bool operator !=(const FractionalResourceQuantity &rhs) const ; bool operator <(const FractionalResourceQuantity &rhs) const ; bool operator >(const FractionalResourceQuantity &rhs) const ; bool operator <=(const FractionalResourceQuantity &rhs) const ; bool operator >=(const FractionalResourceQuantity &rhs) const ; double ToDouble () const ; private : int64_t resource_quantity_; };
这类运算实现的时候有个基本思想:尽量复用 ,即定义最小数量的正交操作 ,然后用这些操作来实现另外的操作。这样有两个好处:
代码简洁,因为复用了。
改动方便,将来如果要改变实现只需改变最基本的操作实现。
具体到本例子中的布尔操作集,首先定义等于和小于 操作符作为基本操作集,然后以此实现其他几个操作符。后面 ResourceSet
中也有类似的思想:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 bool FractionalResourceQuantity::operator ==(const FractionalResourceQuantity &rhs) const { return resource_quantity_ == rhs.resource_quantity_; } bool FractionalResourceQuantity::operator <(const FractionalResourceQuantity &rhs) const { return resource_quantity_ < rhs.resource_quantity_; } bool FractionalResourceQuantity::operator !=(const FractionalResourceQuantity &rhs) const { return !(*this == rhs); } bool FractionalResourceQuantity::operator >(const FractionalResourceQuantity &rhs) const { return rhs < *this ; } bool FractionalResourceQuantity::operator <=(const FractionalResourceQuantity &rhs) const { return !(*this > rhs); } bool FractionalResourceQuantity::operator >=(const FractionalResourceQuantity &rhs) const { return !(*this < rhs); }
资源集合(ResourceSet) ResourceSet
是一系列不同种类的资源及其量值的集合,实现上是对字典(unordered_map
)包装。在物理意义上,一般用来表示一个节点的总资源量、已经使用的资源量、剩余可用的资源量等等。
基本操作包括对单个资源的增删,以及资源集合间的运算;详细见代码内注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class ResourceSet { public : ResourceSet (); ResourceSet ( const std::unordered_map<std::string, FractionalResourceQuantity> &resource_map); ResourceSet (const std::unordered_map<std::string, double > &resource_map); ResourceSet (const std::vector<std::string> &resource_labels, const std::vector<double > resource_capacity); ~ResourceSet (); bool IsSubset (const ResourceSet &other) const ; bool operator ==(const ResourceSet &rhs) const ; bool IsEqual (const ResourceSet &other) const ; bool IsSuperset (const ResourceSet &other) const ; void AddOrUpdateResource (const std::string &resource_name, const FractionalResourceQuantity &capacity) ; bool DeleteResource (const std::string &resource_name) ; FractionalResourceQuantity GetResource (const std::string &resource_name) const ; bool IsEmpty () const ; void AddResourcesCapacityConstrained (const ResourceSet &other, const ResourceSet &total_resources) ; void AddResources (const ResourceSet &other) ; void SubtractResources (const ResourceSet &other) ; void SubtractResourcesStrict (const ResourceSet &other) ; const ResourceSet GetNumCpus () const ; const std::unordered_map<std::string, double > GetResourceMap () const ; const std::unordered_map<std::string, FractionalResourceQuantity> &GetResourceAmountMap () const ; const std::string ToString () const ; private : std::unordered_map<std::string, FractionalResourceQuantity> resource_capacity_; };
集合间加减运算时,有两个额外的带上下界检查的函数。应该是为了避免小数不精确运算导致的后果?
资源标号(ResourceIds) ResourceIds
解决的问题是为某种资源打上标号,并且以某种方式拆分资源。
对于资源标号 ,即给系统所有资源打上一个逻辑 ID(0~n-1)。比如说 GPU 0, GPU 1 等等。以使用户代码能够对资源进行定位,从而要求某段代码具体使用某个资源。
对于资源拆分 ,Ray 要求 API (ray.remote(label=amount)
)只能以两种形式使用资源:
amount >= 1 且必须是整数。
amount < 1,即是纯小数。
对应到物理意义上,即要么独占 一到多个整份资源,要么和其他人共享 单份资源。前者的经典例子是 CPU,后者经典例子是 GPU。
在内部实现上,ResourceIds
维护了两个列表。一个列表是整整型列表(vector<int64_t> whole_ids_
)代表所有的整数份资源的 ID 列表。一个列表是键值对列表(vector<pair<int64_t, FractionalResourceQuantity>>
),代表所有小数份资源 ID 及其对应的剩余份数。值得一提的是,对于一个节点,初始来说应该都是整数份资源(除非有某种特殊用途,比如不想让集群用满该节点资源啦)。然后随着需要小数份资源的任务的调度,一部分资源被切分,实现上表现为从整份资源列表中拿出一个资源,切分后,分出去一块给任务,剩下的放到小数份资源列表中。因此,两个列表中不会有相同的 ID ,因为每个 ID 都最多对应一整份资源。如果由于任务完成,导致某些小数份资源释放,使得小数份资源列表中的具有同样 ID,这样的资源在还回时候会被合并,如果等于1之后,就会被拿到整数份资源列表中。
在资源分配的时候有些小原则。比如说要求小数份资源,我们优先去小数份资源列表里去找符合要求的,不能满足要求的话再去整数份资源列表中拆。
拆分的另一个问题是,我们不能将属于两个 ID 的两个小数份资源(比如说标号0 的有 0.5 份,标号 1 的有 0.5份)合到一块分配给一个要求较大的资源任务(比如说一个要求 0.75 份资源的任务)。举个例子来说,有两个 GPU 还剩一半用量,你不能将他们合起来分配给一个要求 0.75 份 GPU的任务。
还有一个变量 decrement_backlog_
用来记录所有超额资源请求。等待其他人 Release
了,会优先满足这些请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 class ResourceIds { public : ResourceIds (); explicit ResourceIds (double resource_quantity) ; explicit ResourceIds (const std::vector<int64_t > &whole_ids) ; explicit ResourceIds ( const std::vector<std::pair<int64_t , FractionalResourceQuantity>> &fractional_ids) ; ResourceIds ( const std::vector<int64_t > &whole_ids, const std::vector<std::pair<int64_t , FractionalResourceQuantity>> &fractional_ids); bool Contains (const FractionalResourceQuantity &resource_quantity) const ; ResourceIds Acquire (const FractionalResourceQuantity &resource_quantity) ; void Release (const ResourceIds &resource_ids) ; ResourceIds Plus (const ResourceIds &resource_ids) const ; const std::vector<int64_t > &WholeIds () const ; const std::vector<std::pair<int64_t , FractionalResourceQuantity>> &FractionalIds () const ; bool TotalQuantityIsZero () const ; FractionalResourceQuantity TotalQuantity () const ; std::string ToString () const ; void UpdateCapacity (int64_t new_capacity) ; private : bool IsWhole (double resource_quantity) const ; void IncreaseCapacity (int64_t increment_quantity) ; void DecreaseCapacity (int64_t decrement_quantity) ; std::vector<int64_t > whole_ids_; std::vector<std::pair<int64_t , FractionalResourceQuantity>> fractional_ids_; FractionalResourceQuantity total_capacity_; int64_t decrement_backlog_; };
资源标号集合(ResourceIdSet) ResourceIdSet
表示一组带标号的可用资源的集合。实现上用了一个字典 unordered_map<string, ResourceIds> available_resources_
,表示资源种类到其数量(标号过的)映射,并在其上定义了和 ResourceIds
差不多的接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class ResourceIdSet { public : ResourceIdSet (); ResourceIdSet (const ResourceSet &resource_set); ResourceIdSet (const std::unordered_map<std::string, ResourceIds> &available_resources); bool Contains (const ResourceSet &resource_set) const ; ResourceIdSet Acquire (const ResourceSet &resource_set) ; void Release (const ResourceIdSet &resource_id_set) ; void ReleaseConstrained (const ResourceIdSet &resource_id_set, const ResourceSet &resources_total) ; ResourceIdSet Plus (const ResourceIdSet &resource_id_set) const ; void AddOrUpdateResource (const std::string &resource_name, int64_t capacity) ; void DeleteResource (const std::string &resource_name) ; void Clear () ; const std::unordered_map<std::string, ResourceIds> &AvailableResources () const ; ResourceIdSet GetCpuResources () const ; ResourceSet ToResourceSet () const ; std::string ToString () const ; std::vector<rpc::ResourceIdSetInfo> ToProtobuf () const ; private : std::unordered_map<std::string, ResourceIds> available_resources_; };
调度资源类(SchedulingResource) 该类是最终对外负责的类,记录了某个节点上 所有可供调度或者使用中的资源信息(resources_total_
),待使用的资源信息(resources_load_
)以及剩余可用的资源(resources_available_
)。上面三个字段皆为 ResourceIdSet
类型。
三者关系为:
resources_total_ = resouces_used_by_running_tasks + resources_available_
resources_load_ is part of resources_available_
第二个关系可能看起来比较奇怪,后面会详细讲。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class SchedulingResources { public : SchedulingResources (); SchedulingResources (const ResourceSet &total); ~SchedulingResources (); const ResourceSet &GetAvailableResources () const ; const ResourceSet &GetLoadResources () const ; const ResourceSet &GetTotalResources () const ; void SetLoadResources (ResourceSet &&newset) ; void SetAvailableResources (ResourceSet &&newset) ; void Release (const ResourceSet &resources) ; void Acquire (const ResourceSet &resources) ; void UpdateResource (const std::string &resource_name, int64_t capacity) ; void DeleteResource (const std::string &resource_name) ; std::string DebugString () const ; private : ResourceSet resources_total_; ResourceSet resources_available_; ResourceSet resources_load_; };
有意思的是,单从该源码来看, Release
和 Require
只对 resources_available_
进行了操作;而 resources_load_
只有整体 set 和 get 的操作,当然也可以通过 GetLoadResources
获取其引用后,直接对其进行加减。
结合其他源码思忖了一下,Ray 似乎想用 resources_load_
描述所有 SchedulingQueue::ready_queue_
需求总量,而非正在运行的任务的需求总量。正在运行的任务需求量应为 resources_total_ - resources_available_
。也就是说 resources_load_
是 resources_available_
的一部分,用来描述所有准备好的任务的资源需求总量 。
作为一个典型的实现代表,贴一下 UpdateResource
的代码实现,该操作是对某类资源总量的更新;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void SchedulingResources::UpdateResource (const std::string &resource_name, int64_t capacity) { const FractionalResourceQuantity new_capacity = FractionalResourceQuantity (capacity); const FractionalResourceQuantity ¤t_capacity = resources_total_.GetResource (resource_name); if (current_capacity > 0 ) { const FractionalResourceQuantity capacity_difference = new_capacity - current_capacity; const FractionalResourceQuantity ¤t_available_capacity = resources_available_.GetResource (resource_name); FractionalResourceQuantity new_available_capacity = current_available_capacity + capacity_difference; if (new_available_capacity < 0 ) { new_available_capacity = 0 ; } resources_total_.AddOrUpdateResource (resource_name, new_capacity); resources_available_.AddOrUpdateResource (resource_name, new_available_capacity); } else { resources_total_.AddOrUpdateResource (resource_name, new_capacity); resources_available_.AddOrUpdateResource (resource_name, new_capacity); } }
调度策略(SchedulingPolicy) 前面提到,Ray 使用去中心化的调度策略,即每个节点独立的对自己所看到的任务进行调度。SchedulingPolicy
就是描述单个节点的调度策略的,它通过构造函数拿到上一篇文章中提到的 SchedulingQueue
引用 ,从而拿到本节点所有的任务,然后通过 GCS 获取一组节点的资源概况(本节点的通过配置加载,对于其他节点,在感知到其加入集群的时候,从 GCS 中拉取),以 unordered_map<ClientID, SchedulingResources> &cluster_resources
表示。从而根据任务资源需求与节点资源存量的适配情况,进行调度决策。
此外,还有个 SpillOver
方法,其中 Schedule
方法是针对所有状态为 TaskState::PLACEABLE
的任务在一组节点 中进行决策,所谓调度 ;SpillOver
方法是针对所有状态为 TaskState::INFEASIBLE
和 TaskState::READY
的任务在新加入的单个节点 进行尝试,所谓挤出。只是后来随着本地资源也可以动态调整,也会在本地资源调整后使用此策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class SchedulingPolicy { public : SchedulingPolicy (const SchedulingQueue &scheduling_queue); std::unordered_map<TaskID, ClientID> Schedule ( std::unordered_map<ClientID, SchedulingResources> &cluster_resources, const ClientID &local_client_id) ; std::vector<TaskID> SpillOver (SchedulingResources &remote_scheduling_resources) const ; virtual ~SchedulingPolicy (); private : const SchedulingQueue &scheduling_queue_; std::mt19937_64 gen_; };
Schedule 对于 Schedule
函数,大概伪码如下:
1 2 3 4 5 6 7 8 9 for task in placeable_tasks_list: clients = find_all_available_resources_statisfied_clients() if cliens.is_not_empty(): decision[task] = random_select_one(clients) else : clients = find_all_total_resources_statisfied_clients() if clients.is_not_empty(): decision[task] = random_select_one(clients) return decision
其中有两个值得注意的点:
对于每个任务,会按次序对所有节点筛选两遍。第一次针对每个节点的 真正可用 (resources_available_ - resources_load_
)资源,第二次是针对节点所有资源(resources_total
)。
虽然注释里写着TODO:按权重进行节点选择 。但是注释过去一年多了,现在代码中的策略仍然是对满足资源要求的节点集合随机选择一个节点,将任务调度过去。我猜其中有个可能的原因是在去中心化的调度决策下,一致性很难保证,随机选择反而能取得更好的性能。举个例子,如果按空闲资源量作为权重 进行节点选择,如果某个节点加入了,那么剩余节点在调度的时候可能一哄而上的将任务调度到该节点上,造成新加入的很快过载,然后该节点再将过载的任务调度出去,从而来回拉风车 式调度。
其中对于资源增删的操作稍稍复杂一些,贴在这里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 for (const auto &t : scheduling_queue_.GetTasks (TaskState::PLACEABLE)) { const auto &spec = t.GetTaskSpecification (); const auto &resource_demand = spec.GetRequiredPlacementResources (); const TaskID &task_id = spec.TaskId (); std::vector<ClientID> client_keys; for (const auto &client_resource_pair : cluster_resources) { ClientID node_client_id = client_resource_pair.first; const auto &node_resources = client_resource_pair.second; ResourceSet available_node_resources = ResourceSet (node_resources.GetAvailableResources ()); available_node_resources.SubtractResources (node_resources.GetLoadResources ()); if (resource_demand.IsSubset (available_node_resources)) { client_keys.push_back (node_client_id); } } if (!client_keys.empty ()) { std::uniform_int_distribution<int > distribution (0 , client_keys.size() - 1 ) ; int client_key_index = distribution (gen_); const ClientID &dst_client_id = client_keys[client_key_index]; decision[task_id] = dst_client_id; ResourceSet new_load (cluster_resources[dst_client_id].GetLoadResources()) ; new_load.AddResources (resource_demand); cluster_resources[dst_client_id].SetLoadResources (std::move (new_load)); } ...
SpillOver 该函数比较简单,伪码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def spill_over (remote_scheduling_resources ): decision = [] new_load = ResourceSet() for task in infeasible_task_list: if task.required_res.is_subset_of(remote_scheduling_resources): decision.append(task.id ) new_load.add(task.required_res) for task in ready_task_list: if task.required_res.is_subset_of(remote_scheduling_resources): decision.append(task.id ) new_load.add(task.required_res) break remote_scheduling_resources.set_load(new_load) return decision
该函数开始时 应对的场景是,当感知到一个新节点上线时,会检测本机的某些任务能不能被调度过去。包括不可放置的任务 (该节点上线前没有满足该任务资源需求的节点)和至多一个 准备好的任务,我猜测这么干是为了弥补随机调度的不足,当一个新节点上线时,其他所有节点都将自己的任务匀给它一个 (这个策略也比较有意思哈),以使得负载相对缓慢的从其他节点转移到新加入的节点。
后来随着版本迭代,节点静态 资源变成动态 资源。如果一个节点在启动时,通过配置加载其拥有的资源总量,此后维持不变,是为静态 ;如果在运行时资源总量仍然可设置,则为动态 。在这种设计下,如果本节点资源总量被重新设置,那么也可能会调用此函数,对不可放置任务进行再尝试。至于匀任务这个操作,在此情景下,其实没什么意义。
最后,不要忘记的是,需要给被调度的节点设置资源负载,进行”占坑”,以使得其他调度决策及时感知到到本次调度所带来的节点资源负载变化。
名词解释
逻辑和实现:逻辑表示类对外的抽象;实现表示类在内部的实际组织。
resouce_label/resource_name: 或者说资源名称,标记某一种类的资源的标记,比如 GPU,CPU,Memory 等等
ResourceId: 资源标号,给所有资源按照 0, 1, … , n-1 打上标记,以对某个资源进行索引。典型的如 GPU0, GPU1 ..
静态和动态资源:这是针对节点资源总量来说的,如果一个节点在启动时通过配置加载其拥有的资源总量,此后维持不变,是为静态 ;如果在运行时资源总量仍然可设置,则为动态
我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记 ”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。
关注公众号后,回复“资料 ”可以获取我总结一份分布式数据库学习资料。
回复“优惠券 ”可以获取我的大规模数据系统付费专栏《系统日知录 》的八折优惠券。
我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。
另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里 ),欢迎来玩耍。