The previous post covered how tasks awaiting scheduling are organized. This post continues with the softer topic: node resource abstraction and scheduling policies.
Introduction
Since Ray supports explicit resource constraints for tasks, it needs to abstract the resources of all nodes in a hardware-agnostic way, managing all resources uniformly to enable logical addition and removal of resources. When a node joins, its total resource capacity must be sensed; when a task is scheduled, a node satisfying the constraints must be found; when a task is successfully scheduled, the remaining available resources can be obtained, and so on.
In addition to standard resources like CPU and GPU, Ray also supports scheduling of user-defined labeled resources. When starting a node (ray start --resources <resources>), users specify the total amount of some category of resource that the node has (e.g., memory, bandwidth, a certain model of GPU, etc.). When defining a remote function, users specify how much of that category the task requires. Ray’s scheduler will then dispatch the task to a specific machine according to the user’s custom resource requirements. This is an interesting design for interaction between user code and the scheduler.
For scheduling policies, due to Ray’s decentralized scheduling, inconsistent states can easily arise. The simplest approach in practice turns out to be statistically optimal—for each task, find nodes that satisfy the resource constraints and randomly select one to dispatch the task to.
Author: Muniao’s Notes https://www.qtmuniao.com, please indicate the source when reposting
Scheduling Resource Abstraction (SchedulingResources)
The four most basic classes are FractionalResourceQuantity, ResourceSet, ResourceIds, and ResourceIdSet. A brief overview of each class:
FractionalResourceQuantitydefines the quantity of a certain resource.ResourceSetis a set of different kinds of resources and their quantities.ResourceIdsassigns indices to resource quantities by fractions—0, 1 … quantity-1.ResourceIdSetis a set of indexed resources.
The first two describe resources at the how much level, while the latter two deconstruct resources at the index level.
The latter two are refinements based on the former two. They both define a single quantity and a set quantity composed of different kinds of quantities.
Furthermore, an important point reflected in the name FractionalResourceQuantity is that Ray supports small quantity values, but only pure fractional values. Why this design? Take the simplest example: GPUs are expensive, so we might want multiple tasks to share a single GPU to improve GPU utilization. Then each task can specify a fractional GPU requirement when defining its resource needs. In this scenario, non-pure fractional values like 1.x or 2.x don’t make much sense—after all, you either occupy one entirely or share one with others.
Resource Quantity (FractionalResourceQuantity)
FractionalResourceQuantity is a wrapper around double, representing the quantity of resource measurement in Ray. However, to avoid precision loss in calculations, its internal implementation actually uses a 64-bit integer—the actual value multiplied by kResourceConversionFactor = 10000 and truncated. The purpose is clear:
- For Ray’s resource usage scenarios, precision to about four or five decimal places is sufficient.
- It provides exact arithmetic within this precision.
On this basis, it overloads some basic operations for measurable quantities—addition, subtraction, and boolean operations. In Ray’s scenarios, only node joining (adding resources), checking schedulability (comparing resources), and task scheduling (decreasing resources) are needed, so multiplication and division are not required.
Of course, it can also be understood from another angle, perhaps more intuitively: its internal representation effectively changes the dimension/unit from a logical 1 to a smaller 1/kResourceConversionFactor.
1 | class FractionalResourceQuantity { |
A basic principle when implementing such operations is to reuse as much as possible, that is, to define the minimum number of orthogonal operations and then use these to implement other operations. This has two benefits:
- The code is concise because of reuse.
- Changes are easier—if the implementation needs to change in the future, only the most basic operations need to be modified.
Specifically for the boolean operation set in this example, first define equals and less than as the basic operation set, then implement the other operators based on these. A similar idea appears later in ResourceSet:
1 | // Two basic operations |
Resource Set (ResourceSet)
ResourceSet is a collection of different kinds of resources and their quantities. In terms of implementation, it is a wrapper around a dictionary (unordered_map). In physical terms, it is generally used to represent a node’s total resources, already used resources, remaining available resources, etc.
Basic operations include adding or deleting a single resource, as well as operations between resource sets. See the inline comments for details.
1 | class ResourceSet { |
When performing addition and subtraction between sets, there are two additional functions with upper and lower bound checks. Probably to avoid consequences caused by imprecise floating-point operations?
Resource IDs (ResourceIds)
ResourceIds solves the problem of assigning IDs to resources and splitting resources in some way.
For resource IDs, it assigns a logical ID (0~n-1) to all resources in the system. For example, GPU 0, GPU 1, etc. This allows user code to locate resources and request that certain code use a specific resource.
For resource splitting, Ray requires the API (ray.remote(label=amount)) to use resources in only two forms:
- amount >= 1 and must be an integer.
- amount < 1, i.e., a pure fraction.
Corresponding to the physical meaning: either exclusively occupy one or more whole resources, or share a single resource with others. A classic example of the former is CPU; a classic example of the latter is GPU.
In internal implementation, ResourceIds maintains two lists. One is a whole integer list (vector<int64_t> whole_ids_) representing the IDs of all whole resources. The other is a key-value pair list (vector<pair<int64_t, FractionalResourceQuantity>>) representing the IDs of all fractional resources and their corresponding remaining amounts. It is worth mentioning that for a node, initially all resources should be whole (unless there is some special purpose, such as not wanting the cluster to use up all the node’s resources). Then as tasks requiring fractional resources are scheduled, some resources are split—in implementation, this is done by taking one resource from the whole list, splitting it, giving a portion to the task, and putting the rest into the fractional list. Therefore, there will be no identical IDs in the two lists, because each ID corresponds to at most one whole resource. If certain fractional resources are released upon task completion, causing some resources with the same ID in the fractional list, such resources will be merged upon return; if they equal 1, they will be moved back to the whole list.
There are some small principles in resource allocation. For example, when requesting a fractional resource, we first look in the fractional list for one that satisfies the requirement; if not found, we then split one from the whole list.
Another issue with splitting is that we cannot combine two fractional resources belonging to two different IDs (e.g., ID 0 has 0.5, ID 1 has 0.5) and assign them together to a task requiring a larger resource (e.g., a task requiring 0.75 of a resource). For example, if two GPUs each have half usage remaining, you cannot combine them to assign to a task requiring 0.75 GPU.
There is also a variable decrement_backlog_ used to record all overcommitted resource requests. When others Release resources, these requests will be satisfied first.
1 | class ResourceIds { |
Resource ID Set (ResourceIdSet)
ResourceIdSet represents a set of available resources with IDs. In implementation, it uses a dictionary unordered_map<string, ResourceIds> available_resources_ to map resource types to their quantities (indexed), and defines interfaces similar to those of ResourceIds.
1 | class ResourceIdSet { |
Scheduling Resource Class (SchedulingResource)
This class is the ultimate externally responsible class, recording all information about resources available for scheduling or in use on a certain node (resources_total_), pending usage resources (resources_load_), and remaining available resources (resources_available_). All three fields above are of type ResourceIdSet.
The relationships among the three are:
resources_total_ = resouces_used_by_running_tasks + resources_available_resources_load_ is part of resources_available_
The second relationship may look strange; it will be explained in detail later.
1 | class SchedulingResources { |
Interestingly, from this source code alone, Release and Require only operate on resources_available_; while resources_load_ only has whole set and get operations. Of course, you can also get a reference to it via GetLoadResources and directly add or subtract from it.
After thinking about other source code, it seems Ray wants to use resources_load_ to describe the total demand of all tasks in SchedulingQueue::ready_queue_, rather than the demand of currently running tasks. The demand of currently running tasks should be resources_total_ - resources_available_. That is, resources_load_ is part of resources_available_, used to describe the total resource demand of all ready tasks.
As a typical implementation example, here is the code implementation of UpdateResource, which updates the total capacity of a certain resource type:
1 | void SchedulingResources::UpdateResource(const std::string &resource_name, |
Scheduling Policy (SchedulingPolicy)
As mentioned earlier, Ray uses a decentralized scheduling policy, meaning each node independently schedules the tasks it sees. SchedulingPolicy describes the scheduling policy of a single node. It obtains a reference to SchedulingQueue mentioned in the previous article through its constructor, thereby obtaining all tasks on this node. Then it obtains an overview of a set of nodes’ resources through GCS (the local node’s is loaded through configuration; for other nodes, when it senses them joining the cluster, it pulls from GCS), represented as unordered_map<ClientID, SchedulingResources> &cluster_resources. It then makes scheduling decisions based on the matching between task resource requirements and node resource availability.
In addition, there is a SpillOver method. The Schedule method makes decisions for all tasks in the TaskState::PLACEABLE state among a set of nodes—so-called scheduling. The SpillOver method attempts tasks in the TaskState::INFEASIBLE and TaskState::READY states on a newly joined single node—so-called spilling over. Later, as local resources could also be dynamically adjusted, this policy would also be used after local resource adjustments.
1 | class SchedulingPolicy { |
Schedule
For the Schedule function, the approximate pseudocode is as follows:
1 | for task in placeable_tasks_list: |
There are two points worth noting:
- For each task, all nodes are filtered twice in sequence. The first time is against each node’s truly available resources (
resources_available_ - resources_load_), and the second time is against the node’s total resources (resources_total). - Although the comments say TODO: select nodes by weight. However, the comment is over a year old, and the current strategy in the code is still to randomly select one node from the set of nodes satisfying the resource requirements and dispatch the task there. I guess one possible reason is that under decentralized scheduling decisions, consistency is hard to guarantee, and random selection actually achieves better performance. For example, if nodes are selected by available resource amount as weight, when a new node joins, the remaining nodes might rush to dispatch tasks to that node when scheduling, causing the new node to quickly become overloaded. Then that node would dispatch the overloaded tasks out again, resulting in back-and-forth windmill-like scheduling.
The operations for adding and deleting resources are slightly more complex. They are posted here:
1 | for (const auto &t : scheduling_queue_.GetTasks(TaskState::PLACEABLE)) { |
SpillOver
This function is relatively simple. The pseudocode is as follows:
1 | def spill_over(remote_scheduling_resources): |
The scenario this function initially addressed is that when a new node comes online, it checks whether some tasks on the local machine can be dispatched there. This includes unplaceable tasks (before the node came online, there was no node satisfying the task’s resource requirements) and at most one ready task. I guess this is done to compensate for the shortcomings of random scheduling—when a new node comes online, all other nodes each offload one task to it (this policy is also quite interesting), so that load is transferred relatively slowly from other nodes to the newly joined node.
Later, with version iterations, node static resources became dynamic resources. If a node’s total resources are loaded through configuration at startup and remain unchanged thereafter, it is static; if the total resources can still be set at runtime, it is dynamic. Under this design, if the local node’s total resources are reset, this function may also be called to retry unplaceable tasks. As for the task offloading operation, in this scenario, it doesn’t really make much sense.
Finally, don’t forget that the dispatched node needs to have its resource load set, to “reserve” resources so that other scheduling decisions can promptly sense the load changes on the node caused by this scheduling decision.
Glossary
- Logic and implementation: Logic refers to a class’s external abstraction; implementation refers to its actual internal organization.
- resouce_label/resource_name: Or resource name, a marker marking a certain kind of resource, such as GPU, CPU, Memory, etc.
- ResourceId: Resource ID, assigning IDs to all resources according to 0, 1, … , n-1, to index a certain resource. Typical examples are GPU0, GPU1, etc.
- Static and dynamic resources: This refers to a node’s total resources. If a node’s total resources are loaded through configuration at startup and remain unchanged thereafter, it is static; if the total resources can still be set at runtime, it is dynamic.
