概述
Facebook Velox 是一个针对 SQL 运行时的 C++ 库,旨在统一 Facebook 各种计算流,包括 Spark 和 Presto,使用推的模式、支持向量计算。
Velox 接受一棵优化过的 PlanNode
Tree,然后将其切成一个个的线性的 Pipeline
,Task
负责这个转变过程,每个 Task 针对一个 PlanTree Segment。大多数算子是一对一翻译的,但是有一些特殊的算子,通常出现在多个 Pipeline 的切口处,通常来说,这些切口对应计划树的分叉处,如 HashJoinNode
,CrossJoinNode
, MergeJoinNode
,通常会翻译成 XXProbe 和 XXBuild。但也有一些例外,比如 LocalPartitionNode
和 LocalMergeNode
。
作者:木鸟杂记 https://www.qtmuniao.com/2023/03/22/velox-task-analysis 转载请注明出处
为了提高执行的并行度,Velox 引入了 LocalPartitionNode
节点,可以将一个 Pipeline
进行多线程(每个线程一个实例)并行运行,并且互斥的消费数据。其中每个实例称为 Driver
。该算子在输入计划树里并没有分叉(即没有多个 source),但在翻译成物理算子时,会在此节点处进行切开,并在切口前后改变执行的并行度,对应的物理算子是LocalPartition
和 LocalExchange
。
还有一个特殊节点,称为 LocalMergeNode
,该对输入有要求:必须有序,然后会进行单线程的归并排序,从而使输出全局有序。也因此,由其而切开的消费 Pipeline 一定是单 Driver 的。翻译成算子,对应两个 CallbackSink
和 LocalMerge
。
总结一下,上述五个 PlanNode,HashJoinNode
,CrossJoinNode
, MergeJoinNode
,LocalPartitionNode
,LocalMergeNode
在翻译时会造成切口,即将逻辑 PlanTree 切成多个物理 Pipeline,因此在切口处会将一个逻辑算子翻译成多个物理算子,分到不同 Pipeline 上。每个 Pipeline 会有一个从 0 开始的编号:Pipeline ID,是全局粒度的。
并且,可以由 LocalPartitionNode
来按需改变每个 Pipeline 并行度,其中 Pipeline 的每个线程由一个 Driver 来执行。每个 Driver 也有一个从 0 开始的编号:Driver ID,是 Pipeline 粒度的。
其他 PlanNode 到算子的翻译基本都是一对一的,感兴趣的可以看官方文档的这个页面:Plan Nodes and Operators。
下面展开一些细节。
Splits
Velox 允许应用层(即 Velox 的使用方)以 Splits (每个算子的输入片段称为 Split)的方式给 Pipeline 喂数据,可以流式的喂,因此有两个 API:
Task::addSplit(planNodeId, split)
:喂一份数据给 VeloxTask::noMoreSplits()
:通知 Velox 我喂完了。
Velox 会使用一个队列在缓存这些 Splits 数据。在数据喂完之前的任意一个时刻,Pipeline 的叶子算子(对的,外部喂数据只能发生在叶子节点,如 TableScan,Exchange 和 MergeExchange)都可以从队列中取数据,对应 API 是 Task::getSplitOrFuture(planNodeId)
,返回值有两种:
- 如果队列中有数据,则返回一个 Split
- 如果队列中无数据,但还没有收到喂完的信号,则返回一个 Future (类似于一个欠条,之后有数据之后,会凭该欠条兑付)。
Join Bridges and Barriers
Join (HashJoinNode 和 CrossJoinNode)会翻译成 XXProbe 和 XXBuild 两个算子,并且通过一个共享的 Bridge 来沟通数据,两侧 Pipeline 都可以通过 Task::getHashJoinBridge()
函数来根据 PlanNodeId 获取该共享的 Bridge。
为了提高 build 速度,build 侧 Pipeline 通常使用多个 Driver 并发执行。但由于只有一个 Bridge,每个 Driver 在结束时可以调用 Task::allPeersFinished()
(内部是使用一个 BarrierState
的结构来实现的)来判断自己是否为最后一个 Driver,如果是,则将所有 Driver 的输出进行合并后送到 Bridge。
当然,在 RIGHT and FULL OUTER join 情况下,Probe 侧也需要将没有 match 上的数据喂给 Bridge,此时也需要由最后一个 Driver 来负责这件事,于是同样需要调用 Task::allPeersFinished()
函数。
下面来详细看下 Join 类算子的切分细节。以 HashJoin 为例,Task 在切分 PlanTree 时,会将逻辑上的一个 HashJoin 算子,转化成物理上的一对算子:HashProbe 和 HashJoin,并且使用异步机制进行通知:在 HashJoin 完成后,通知 HashProbe 所在 Pipeline 继续执行,在此之前,后者是阻塞等待的。
如上图,每个 Pipeline 在实例化(逻辑 PlanNode 转物理 Operator)的时候,可以生成多份,进行并行执行,互斥的消费数据。并且,每个 Pipeline 的并行粒度可以不一样,如上图 Probe Pipeline 实例化了两份,而 Build Pipeline 实例化了三份。并且,Build Pipeline 组中最后一个运行完的 Pipeline 负责将数据通过 Bridge 发送给 Probe Pipeline。
Exchange Clients
Velox 使用 Exchange Clients 来获取远程 worker 的数据。分两个步骤:
第一步,Pipeline 中第一个 Driver (driverId == 0) 的 Exchange 算子从 Task 中获取一个 Split,并且初始化一个共享 Exchange Client。
第二步,Exchange Client 会为上游每个 Task 构造一个 Exchange Source,并行的拉取每个上游 Task 同一个 Partition (图中是 Partition-15)数据,然后将其放在 Client 的队列 Queue 中。Exchange 的每个 Driver 都会去队列中拉取这些数据。
如何从上游 Task 拉取数据的逻辑,需要由用户自定义实现 ExchangeSource
和 ExchangeSource::Factory
。每个 ExchangeSource
接受一个上游 Task 的字符串 ID、Partition 编号和一个队列作为参数。然后会从上游 Task 中拉取该 Partition 的数据,并且放到队列中。
文章剩下还有一部分,欢迎订阅我的专栏看全文:
小报童-Facebook velox Task 解析
这是在小报童上面的一个付费订阅专栏,这是专栏地址:https://xiaobot.net/p/system-thinking
小报童是一个借鉴国外 newsletter 专注写作的付费订阅平台,只不过将邮箱换成了微信。 现在初步打算围绕“系统”开几个系列: 图数据库101系列 每天学点数据库系列 系统好文推荐系列 读书笔记系列 数据密集型论文导读系列 系统,既是数据库系统中的系统,也是分布式系统中的系统,也是人类组织系统中的系统,也可是一切有迹可循、有规律可考的系统。学习系统的架构,借鉴系统的组织,使我们的认知也系统起来。 会保证每周不低于两篇更新,现价一季度 32 块钱,作为给先订阅同学的福利,如果有同学通过你的分享订阅本专栏,你可以拿到该同学订阅费用的百分之三十的抽成~ 分享方式见专栏介绍。 如果有任何建议以及想看的系统文章,欢迎留言~
参考
https://facebookincubator.github.io/velox/develop/task.html