木鸟杂记

大规模数据系统

'Ray Source Code Reading (Part 2): Resource Abstraction and Scheduling Policies'

The previous post covered how tasks awaiting scheduling are organized. This post continues with the softer topic: node resource abstraction and scheduling policies.

Introduction

Since Ray supports explicit resource constraints for tasks, it needs to abstract the resources of all nodes in a hardware-agnostic way, managing all resources uniformly to enable logical addition and removal of resources. When a node joins, its total resource capacity must be sensed; when a task is scheduled, a node satisfying the constraints must be found; when a task is successfully scheduled, the remaining available resources can be obtained, and so on.

In addition to standard resources like CPU and GPU, Ray also supports scheduling of user-defined labeled resources. When starting a node (ray start --resources <resources>), users specify the total amount of some category of resource that the node has (e.g., memory, bandwidth, a certain model of GPU, etc.). When defining a remote function, users specify how much of that category the task requires. Ray’s scheduler will then dispatch the task to a specific machine according to the user’s custom resource requirements. This is an interesting design for interaction between user code and the scheduler.

For scheduling policies, due to Ray’s decentralized scheduling, inconsistent states can easily arise. The simplest approach in practice turns out to be statistically optimal—for each task, find nodes that satisfy the resource constraints and randomly select one to dispatch the task to.

Author: Muniao’s Notes https://www.qtmuniao.com, please indicate the source when reposting

Scheduling Resource Abstraction (SchedulingResources)

The four most basic classes are FractionalResourceQuantity, ResourceSet, ResourceIds, and ResourceIdSet. A brief overview of each class:

  • FractionalResourceQuantity defines the quantity of a certain resource.
  • ResourceSet is a set of different kinds of resources and their quantities.
  • ResourceIds assigns indices to resource quantities by fractions—0, 1 … quantity-1.
  • ResourceIdSet is a set of indexed resources.

The first two describe resources at the how much level, while the latter two deconstruct resources at the index level.

The latter two are refinements based on the former two. They both define a single quantity and a set quantity composed of different kinds of quantities.

Furthermore, an important point reflected in the name FractionalResourceQuantity is that Ray supports small quantity values, but only pure fractional values. Why this design? Take the simplest example: GPUs are expensive, so we might want multiple tasks to share a single GPU to improve GPU utilization. Then each task can specify a fractional GPU requirement when defining its resource needs. In this scenario, non-pure fractional values like 1.x or 2.x don’t make much sense—after all, you either occupy one entirely or share one with others.

Resource Quantity (FractionalResourceQuantity)

FractionalResourceQuantity is a wrapper around double, representing the quantity of resource measurement in Ray. However, to avoid precision loss in calculations, its internal implementation actually uses a 64-bit integer—the actual value multiplied by kResourceConversionFactor = 10000 and truncated. The purpose is clear:

  1. For Ray’s resource usage scenarios, precision to about four or five decimal places is sufficient.
  2. It provides exact arithmetic within this precision.

On this basis, it overloads some basic operations for measurable quantities—addition, subtraction, and boolean operations. In Ray’s scenarios, only node joining (adding resources), checking schedulability (comparing resources), and task scheduling (decreasing resources) are needed, so multiplication and division are not required.

Of course, it can also be understood from another angle, perhaps more intuitively: its internal representation effectively changes the dimension/unit from a logical 1 to a smaller 1/kResourceConversionFactor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class FractionalResourceQuantity {
public:
// Default constructor: resource_quantity_ = 0
FractionalResourceQuantity();

// Basic constructor: specify resource quantity
FractionalResourceQuantity(double resource_quantity);

/// Addition and subtraction operations
const FractionalResourceQuantity operator+(const FractionalResourceQuantity &rhs) const;
const FractionalResourceQuantity operator-(const FractionalResourceQuantity &rhs) const;
void operator+=(const FractionalResourceQuantity &rhs);
void operator-=(const FractionalResourceQuantity &rhs);

/// Boolean operations
bool operator==(const FractionalResourceQuantity &rhs) const;
bool operator!=(const FractionalResourceQuantity &rhs) const;
bool operator<(const FractionalResourceQuantity &rhs) const;
bool operator>(const FractionalResourceQuantity &rhs) const;
bool operator<=(const FractionalResourceQuantity &rhs) const;
bool operator>=(const FractionalResourceQuantity &rhs) const;

/// Actual value as a floating-point number
double ToDouble() const;

private:
// Resource quantity in units of 1/kResourceConversionFactor
int64_t resource_quantity_;
};

A basic principle when implementing such operations is to reuse as much as possible, that is, to define the minimum number of orthogonal operations and then use these to implement other operations. This has two benefits:

  1. The code is concise because of reuse.
  2. Changes are easier—if the implementation needs to change in the future, only the most basic operations need to be modified.

Specifically for the boolean operation set in this example, first define equals and less than as the basic operation set, then implement the other operators based on these. A similar idea appears later in ResourceSet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Two basic operations
bool FractionalResourceQuantity::operator==(const FractionalResourceQuantity &rhs) const {
return resource_quantity_ == rhs.resource_quantity_;
}

bool FractionalResourceQuantity::operator<(const FractionalResourceQuantity &rhs) const {
return resource_quantity_ < rhs.resource_quantity_;
}

// The following call basic operations to complete the definition
bool FractionalResourceQuantity::operator!=(const FractionalResourceQuantity &rhs) const {
return !(*this == rhs);
}

bool FractionalResourceQuantity::operator>(const FractionalResourceQuantity &rhs) const {
return rhs < *this;
}

bool FractionalResourceQuantity::operator<=(const FractionalResourceQuantity &rhs) const {
return !(*this > rhs);
}

bool FractionalResourceQuantity::operator>=(const FractionalResourceQuantity &rhs) const {
return !(*this < rhs);
}

Resource Set (ResourceSet)

ResourceSet is a collection of different kinds of resources and their quantities. In terms of implementation, it is a wrapper around a dictionary (unordered_map). In physical terms, it is generally used to represent a node’s total resources, already used resources, remaining available resources, etc.

Basic operations include adding or deleting a single resource, as well as operations between resource sets. See the inline comments for details.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class ResourceSet {
public:
ResourceSet();

// Three constructors:
// Build a dictionary of label(string)->amount(FractionalResourceQuantity) from a dictionary or a list of key-value pairs.
ResourceSet(
const std::unordered_map<std::string, FractionalResourceQuantity> &resource_map);
ResourceSet(const std::unordered_map<std::string, double> &resource_map);
ResourceSet(const std::vector<std::string> &resource_labels,
const std::vector<double> resource_capacity);

// Destructor
~ResourceSet();

// Determine whether resource set A is a subset of B (all labels' amounts in A are not greater than the corresponding labels' amounts in B).
// Use this operation as a basic operation to implement the next three operations.
bool IsSubset(const ResourceSet &other) const;

// In the following functions, the first two have exactly the same implementation.
bool operator==(const ResourceSet &rhs) const;
bool IsEqual(const ResourceSet &other) const;
bool IsSuperset(const ResourceSet &other) const;

// Dictionary-like add, delete, update, and query operations, i.e., add, delete, update, and query the quantity of a certain kind of resource.
void AddOrUpdateResource(const std::string &resource_name,
const FractionalResourceQuantity &capacity);
bool DeleteResource(const std::string &resource_name);
FractionalResourceQuantity GetResource(const std::string &resource_name) const;
bool IsEmpty() const;

// Addition and subtraction operations between two sets. Note that when adding, sometimes you cannot exceed an upper bound: for example, a node's total resource capacity, hence
// AddResourcesCapacityConstrained; when subtracting, the resource quantity cannot become negative, hence
// SubtractResourcesStrict. These are two upper/lower bound protection functions.
void AddResourcesCapacityConstrained(const ResourceSet &other,
const ResourceSet &total_resources);
void AddResources(const ResourceSet &other);
void SubtractResources(const ResourceSet &other);
void SubtractResourcesStrict(const ResourceSet &other);

// Since all nodes must have CPU, and all task scheduling also requires CPU resources, it is singled out as a separate function.
// Its corresponding label name is: kCPU_ResourceLabel = "CPU"
const ResourceSet GetNumCpus() const;

// Return the list of resources and corresponding quantities organized in dictionary form; one uses double representation, the other uses FractionalResourceQuantity representation.
const std::unordered_map<std::string, double> GetResourceMap() const;
const std::unordered_map<std::string, FractionalResourceQuantity>
&GetResourceAmountMap() const;

const std::string ToString() const;

private:
// Internal dictionary of resources and their quantities
std::unordered_map<std::string, FractionalResourceQuantity> resource_capacity_;
};

When performing addition and subtraction between sets, there are two additional functions with upper and lower bound checks. Probably to avoid consequences caused by imprecise floating-point operations?

Resource IDs (ResourceIds)

ResourceIds solves the problem of assigning IDs to resources and splitting resources in some way.

For resource IDs, it assigns a logical ID (0~n-1) to all resources in the system. For example, GPU 0, GPU 1, etc. This allows user code to locate resources and request that certain code use a specific resource.

For resource splitting, Ray requires the API (ray.remote(label=amount)) to use resources in only two forms:

  1. amount >= 1 and must be an integer.
  2. amount < 1, i.e., a pure fraction.

Corresponding to the physical meaning: either exclusively occupy one or more whole resources, or share a single resource with others. A classic example of the former is CPU; a classic example of the latter is GPU.

In internal implementation, ResourceIds maintains two lists. One is a whole integer list (vector<int64_t> whole_ids_) representing the IDs of all whole resources. The other is a key-value pair list (vector<pair<int64_t, FractionalResourceQuantity>>) representing the IDs of all fractional resources and their corresponding remaining amounts. It is worth mentioning that for a node, initially all resources should be whole (unless there is some special purpose, such as not wanting the cluster to use up all the node’s resources). Then as tasks requiring fractional resources are scheduled, some resources are split—in implementation, this is done by taking one resource from the whole list, splitting it, giving a portion to the task, and putting the rest into the fractional list. Therefore, there will be no identical IDs in the two lists, because each ID corresponds to at most one whole resource. If certain fractional resources are released upon task completion, causing some resources with the same ID in the fractional list, such resources will be merged upon return; if they equal 1, they will be moved back to the whole list.

There are some small principles in resource allocation. For example, when requesting a fractional resource, we first look in the fractional list for one that satisfies the requirement; if not found, we then split one from the whole list.

Another issue with splitting is that we cannot combine two fractional resources belonging to two different IDs (e.g., ID 0 has 0.5, ID 1 has 0.5) and assign them together to a task requiring a larger resource (e.g., a task requiring 0.75 of a resource). For example, if two GPUs each have half usage remaining, you cannot combine them to assign to a task requiring 0.75 GPU.

There is also a variable decrement_backlog_ used to record all overcommitted resource requests. When others Release resources, these requests will be satisfied first.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ResourceIds {
public:
ResourceIds();

// Construct ResourceIds with a given quantity of resources. resource_quantity is either an integer or a pure fraction.
explicit ResourceIds(double resource_quantity);

// In internal implementation, the whole resource list uses vector<int> to represent all corresponding IDs, each having one whole resource.
// For fractional resources, a vector<pair> is used to represent an ID and its corresponding resource quantity.
// Therefore, there are the following three constructors.
explicit ResourceIds(const std::vector<int64_t> &whole_ids);
explicit ResourceIds(
const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &fractional_ids);
ResourceIds(
const std::vector<int64_t> &whole_ids,
const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &fractional_ids);

// Check whether there are enough resources for the requested resource_quantity. If resource_quantity is an integer, just check the
// whole resource list. Note that if resource_quantity is a fraction, there must be a single ID with a resource quantity greater than
// resource_quantity (either a whole resource of at least one, or a fraction greater than resource_quantity).
// You cannot combine two fractions to exceed resource_quantity, because quantities belonging to two different resource IDs cannot be combined.
bool Contains(const FractionalResourceQuantity &resource_quantity) const;

// Carve out a chunk of resources or reclaim a chunk of resources according to the above principles.
// Split appropriately when allocating resources.
// Merge appropriately when reclaiming resources.
ResourceIds Acquire(const FractionalResourceQuantity &resource_quantity);
void Release(const ResourceIds &resource_ids);

// Although the semantics are different, the implementation is the same as Release: add two resource_ids together.
ResourceIds Plus(const ResourceIds &resource_ids) const;

// Get the whole/fractional resource ID lists.
const std::vector<int64_t> &WholeIds() const;
const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &FractionalIds()
const;

// Check whether there are no resources in this ID set.
bool TotalQuantityIsZero() const;

// Return the sum of all resources as a FractionalResourceQuantity.
FractionalResourceQuantity TotalQuantity() const;

std::string ToString() const;

// Update to the specified resource quantity via IncreaseCapacity and DecreaseCapacity; this is done to support users'
// dynamic adjustment of custom resources.
void UpdateCapacity(int64_t new_capacity);

private:
// Determine whether resource_quantity is a whole number.
bool IsWhole(double resource_quantity) const;

void IncreaseCapacity(int64_t increment_quantity);
void DecreaseCapacity(int64_t decrement_quantity);

// Two lists
std::vector<int64_t> whole_ids_;
std::vector<std::pair<int64_t, FractionalResourceQuantity>> fractional_ids_;
// Track total capacity, which is represented by FractionalResourceQuantity, also indicating that this class is a refinement of FractionalResourceQuantity.
FractionalResourceQuantity total_capacity_;

// Temporarily record overcommitted resource requests.
int64_t decrement_backlog_;
};

Resource ID Set (ResourceIdSet)

ResourceIdSet represents a set of available resources with IDs. In implementation, it uses a dictionary unordered_map<string, ResourceIds> available_resources_ to map resource types to their quantities (indexed), and defines interfaces similar to those of ResourceIds.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ResourceIdSet {
public:
// Various constructors, essentially building an unordered_map.
ResourceIdSet();
ResourceIdSet(const ResourceSet &resource_set);
ResourceIdSet(const std::unordered_map<std::string, ResourceIds> &available_resources);

// Contains, acquire, and release. The semantics are the same as the corresponding operations in ResourceIds, except it goes from a single resource type to a set of total resources.
bool Contains(const ResourceSet &resource_set) const;
ResourceIdSet Acquire(const ResourceSet &resource_set);
void Release(const ResourceIdSet &resource_id_set);
void ReleaseConstrained(const ResourceIdSet &resource_id_set,
const ResourceSet &resources_total);
// Same implementation as Release.
ResourceIdSet Plus(const ResourceIdSet &resource_id_set) const;

/// Add, delete, and query the quantity of a certain resource type.
// Add a specified quantity to a certain resource type.
void AddOrUpdateResource(const std::string &resource_name, int64_t capacity);
// Delete a certain resource type.
void DeleteResource(const std::string &resource_name);
// Clear all resources.
void Clear();
// Get all available resources, i.e., return the internal hash table.
const std::unordered_map<std::string, ResourceIds> &AvailableResources() const;

// CPU is required by all tasks, so it is singled out.
ResourceIdSet GetCpuResources() const;

// Convert a set of resources with IDs to a set of resources described only by quantities.
ResourceSet ToResourceSet() const;

// Printing and serialization.
std::string ToString() const;
std::vector<rpc::ResourceIdSetInfo> ToProtobuf() const;

private:
// Mapping from resource type to indexed resource set.
std::unordered_map<std::string, ResourceIds> available_resources_;
};

Scheduling Resource Class (SchedulingResource)

This class is the ultimate externally responsible class, recording all information about resources available for scheduling or in use on a certain node (resources_total_), pending usage resources (resources_load_), and remaining available resources (resources_available_). All three fields above are of type ResourceIdSet.

The relationships among the three are:

  1. resources_total_ = resouces_used_by_running_tasks + resources_available_
  2. resources_load_ is part of resources_available_

The second relationship may look strange; it will be explained in detail later.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class SchedulingResources {
public:
// Default constructor and constructor specifying total capacity.
SchedulingResources();
SchedulingResources(const ResourceSet &total);

~SchedulingResources();

// Getters for total, load, and available quantities.
const ResourceSet &GetAvailableResources() const;
const ResourceSet &GetLoadResources() const;
const ResourceSet &GetTotalResources() const;

// Setters for load and available quantities; total capacity is determined at construction time.
void SetLoadResources(ResourceSet &&newset);
void SetAvailableResources(ResourceSet &&newset);

// Acquire or release a set of resources: add or delete on available resources.
void Release(const ResourceSet &resources);
void Acquire(const ResourceSet &resources);

// Dynamically adjust a node's total resources: update the total of a certain resource type or delete a certain resource type.
void UpdateResource(const std::string &resource_name, int64_t capacity);
void DeleteResource(const std::string &resource_name);

std::string DebugString() const;
private:
ResourceSet resources_total_;
ResourceSet resources_available_;
ResourceSet resources_load_;
};

Interestingly, from this source code alone, Release and Require only operate on resources_available_; while resources_load_ only has whole set and get operations. Of course, you can also get a reference to it via GetLoadResources and directly add or subtract from it.

After thinking about other source code, it seems Ray wants to use resources_load_ to describe the total demand of all tasks in SchedulingQueue::ready_queue_, rather than the demand of currently running tasks. The demand of currently running tasks should be resources_total_ - resources_available_. That is, resources_load_ is part of resources_available_, used to describe the total resource demand of all ready tasks.

As a typical implementation example, here is the code implementation of UpdateResource, which updates the total capacity of a certain resource type:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void SchedulingResources::UpdateResource(const std::string &resource_name,
int64_t capacity) {
const FractionalResourceQuantity new_capacity = FractionalResourceQuantity(capacity);
const FractionalResourceQuantity &current_capacity =
resources_total_.GetResource(resource_name);
if (current_capacity > 0) {
// If this resource type exists, update its total capacity and available quantity accordingly.
const FractionalResourceQuantity capacity_difference =
new_capacity - current_capacity;
const FractionalResourceQuantity &current_available_capacity =
resources_available_.GetResource(resource_name);
FractionalResourceQuantity new_available_capacity =
current_available_capacity + capacity_difference;
if (new_available_capacity < 0) {
new_available_capacity = 0;
}
resources_total_.AddOrUpdateResource(resource_name, new_capacity);
resources_available_.AddOrUpdateResource(resource_name, new_available_capacity);
} else {
// If it does not exist, add it directly.
resources_total_.AddOrUpdateResource(resource_name, new_capacity);
resources_available_.AddOrUpdateResource(resource_name, new_capacity);
}
}

Scheduling Policy (SchedulingPolicy)

As mentioned earlier, Ray uses a decentralized scheduling policy, meaning each node independently schedules the tasks it sees. SchedulingPolicy describes the scheduling policy of a single node. It obtains a reference to SchedulingQueue mentioned in the previous article through its constructor, thereby obtaining all tasks on this node. Then it obtains an overview of a set of nodes’ resources through GCS (the local node’s is loaded through configuration; for other nodes, when it senses them joining the cluster, it pulls from GCS), represented as unordered_map<ClientID, SchedulingResources> &cluster_resources. It then makes scheduling decisions based on the matching between task resource requirements and node resource availability.

In addition, there is a SpillOver method. The Schedule method makes decisions for all tasks in the TaskState::PLACEABLE state among a set of nodes—so-called scheduling. The SpillOver method attempts tasks in the TaskState::INFEASIBLE and TaskState::READY states on a newly joined single node—so-called spilling over. Later, as local resources could also be dynamically adjusted, this policy would also be used after local resource adjustments.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SchedulingPolicy {
public:
// Constructor: obtain a reference to all tasks on this node.
SchedulingPolicy(const SchedulingQueue &scheduling_queue);

// Make scheduling decisions based on cluster resource distribution and task resource requirements, returning a set of tasks and the nodes they are scheduled to.
std::unordered_map<TaskID, ClientID> Schedule(
std::unordered_map<ClientID, SchedulingResources> &cluster_resources,
const ClientID &local_client_id);

// After sensing a new node joining or local resource dynamic adjustment, attempt previously unplaceable tasks and offload
// at most one READY task to the new node.
std::vector<TaskID> SpillOver(SchedulingResources &remote_scheduling_resources) const;

virtual ~SchedulingPolicy();

private:
// Reference to the task queue.
const SchedulingQueue &scheduling_queue_;
/// A random seed.
std::mt19937_64 gen_;
};

Schedule

For the Schedule function, the approximate pseudocode is as follows:

1
2
3
4
5
6
7
8
9
for task in placeable_tasks_list:
clients = find_all_available_resources_statisfied_clients() # available not include load
if cliens.is_not_empty():
decision[task] = random_select_one(clients)
else:
clients = find_all_total_resources_statisfied_clients() # node whole resource
if clients.is_not_empty():
decision[task] = random_select_one(clients)
return decision

There are two points worth noting:

  1. For each task, all nodes are filtered twice in sequence. The first time is against each node’s truly available resources (resources_available_ - resources_load_), and the second time is against the node’s total resources (resources_total).
  2. Although the comments say TODO: select nodes by weight. However, the comment is over a year old, and the current strategy in the code is still to randomly select one node from the set of nodes satisfying the resource requirements and dispatch the task there. I guess one possible reason is that under decentralized scheduling decisions, consistency is hard to guarantee, and random selection actually achieves better performance. For example, if nodes are selected by available resource amount as weight, when a new node joins, the remaining nodes might rush to dispatch tasks to that node when scheduling, causing the new node to quickly become overloaded. Then that node would dispatch the overloaded tasks out again, resulting in back-and-forth windmill-like scheduling.

The operations for adding and deleting resources are slightly more complex. They are posted here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
for (const auto &t : scheduling_queue_.GetTasks(TaskState::PLACEABLE)) {
const auto &spec = t.GetTaskSpecification();
const auto &resource_demand = spec.GetRequiredPlacementResources();
const TaskID &task_id = spec.TaskId();

std::vector<ClientID> client_keys;
for (const auto &client_resource_pair : cluster_resources) {
ClientID node_client_id = client_resource_pair.first;
const auto &node_resources = client_resource_pair.second;
ResourceSet available_node_resources = ResourceSet(node_resources.GetAvailableResources());
// 1. Get the node's truly available resources (resources_available_ - resources_load_).
available_node_resources.SubtractResources(node_resources.GetLoadResources());

// Check resource constraints.
if (resource_demand.IsSubset(available_node_resources)) {
client_keys.push_back(node_client_id);
}
}

if (!client_keys.empty()) {
// Randomly select an index.
std::uniform_int_distribution<int> distribution(0, client_keys.size() - 1);
int client_key_index = distribution(gen_);
const ClientID &dst_client_id = client_keys[client_key_index];
decision[task_id] = dst_client_id;
// 2. Update the corresponding node's load resources.
ResourceSet new_load(cluster_resources[dst_client_id].GetLoadResources());
new_load.AddResources(resource_demand);
cluster_resources[dst_client_id].SetLoadResources(std::move(new_load));
}
...

SpillOver

This function is relatively simple. The pseudocode is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def spill_over(remote_scheduling_resources):
decision = []
new_load = ResourceSet()

# Attempt previously unplaceable tasks.
for task in infeasible_task_list:
if task.required_res.is_subset_of(remote_scheduling_resources):
decision.append(task.id)
new_load.add(task.required_res)

# Offload at most one ready task.
for task in ready_task_list:
if task.required_res.is_subset_of(remote_scheduling_resources):
decision.append(task.id)
new_load.add(task.required_res)
break

# Set the node's resource load.
remote_scheduling_resources.set_load(new_load)
return decision

The scenario this function initially addressed is that when a new node comes online, it checks whether some tasks on the local machine can be dispatched there. This includes unplaceable tasks (before the node came online, there was no node satisfying the task’s resource requirements) and at most one ready task. I guess this is done to compensate for the shortcomings of random scheduling—when a new node comes online, all other nodes each offload one task to it (this policy is also quite interesting), so that load is transferred relatively slowly from other nodes to the newly joined node.

Later, with version iterations, node static resources became dynamic resources. If a node’s total resources are loaded through configuration at startup and remain unchanged thereafter, it is static; if the total resources can still be set at runtime, it is dynamic. Under this design, if the local node’s total resources are reset, this function may also be called to retry unplaceable tasks. As for the task offloading operation, in this scenario, it doesn’t really make much sense.

Finally, don’t forget that the dispatched node needs to have its resource load set, to “reserve” resources so that other scheduling decisions can promptly sense the load changes on the node caused by this scheduling decision.

Glossary

  1. Logic and implementation: Logic refers to a class’s external abstraction; implementation refers to its actual internal organization.
  2. resouce_label/resource_name: Or resource name, a marker marking a certain kind of resource, such as GPU, CPU, Memory, etc.
  3. ResourceId: Resource ID, assigning IDs to all resources according to 0, 1, … , n-1, to index a certain resource. Typical examples are GPU0, GPU1, etc.
  4. Static and dynamic resources: This refers to a node’s total resources. If a node’s total resources are loaded through configuration at startup and remain unchanged thereafter, it is static; if the total resources can still be set at runtime, it is dynamic.

我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg