CockroachDB 和 TiDB 中 SQL 的分布式执行

计算下推其实是常见的思想:将计算推到数据旁。由于在数据库中,逻辑上,计算常在存储层之上,因此将一部分算子推到存储层去做,称为计算下推。其在分布式数据库中尤为重要。

下面是 CockroachDB 和 TiDB 的解决方案,内容来自于文档和博客,因此可能和最新代码的逻辑并不一致。

CockroachDB

基本概念

CockroachDB 中相应的模块叫 DistSQL,其思想来源于Sawzall,有点类似 MapReduce。支持的算子叫做 aggregator,本质上是对 SQL 聚合算子的一种泛化。

作者:木鸟杂记 https://www.qtmuniao.com/2022/04/05/crdb-tidb-dist-sql 转载请注明出处

逻辑上,每个 aggregator 接受一个输入行流(Join 会有多个),产出一个输出行流(output stream of rows)。一(row)是由多个值(column values)构成的元组。输入输出流中会包含每个列值的类型信息,即模式(Schema)。

CockroachDB 还引入了( group )的概念,每个组是一个并行的单元。划分组的依据是组键(group key),可以看出思想有点类似于 MapReduce 中的 Reduce 阶段的 Key。组键其实是 SQL 中 group by 的泛化。两个极端情况:

  1. 所有行同属一个组。则所有的行只能在单节点执行,而不能并发。
  2. 每一行各属一个组。则可以随意切分行的集合,进行并发。

aggregators

有些 aggregator 的输入、输出或逻辑有一些特殊之处:

  1. table reader 没有输入流,会直接从本机 KV 层拿数据。
  2. final 没有输出流,提供最终结果给 query\statement。
  3. finallimit 对输入流有顺序要求(ordering requirement)。
  4. evaluator 可以通过代码自定义其行为逻辑。

逻辑到物理

执行过程有点类似于 Spark 中对 DAG 的拓扑调度和执行。

  1. 读取会被下发到每个 range ,由 range 的 raft leader 负责。
  2. 遇到非 shuffle aggregator,则在各个节点并发执行。
  3. 遇到 shuffle 的 aggregator(比如 group by),就使用某种哈希策略,将输出数据送到对应机器。
  4. 最后在 gateway 机器上执行 final aggregator。

multi-aggregator.png

单个 Processor

每个逻辑 aggregator 在物理上对应一个 Processor,都可以分为三个步骤:

  1. 接受多个输入流,进行合并。
  2. 数据处理。
  3. 对输出按 group 分发到不同机器上。

single-aggregator.png

引用

  1. cockroachdb SQL layer query execution: https://www.cockroachlabs.com/docs/stable/architecture/sql-layer.html#query-execution
  2. https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/life_of_a_query.md
  3. cockroach db rfc 20160421_distributed_sql https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20160421_distributed_sql.md

TiDB

基本概念

TiDB 中的 SQL 执行过程中主要流程为:

  1. 进行词法分析生成 AST(Parsing)
  2. 利用 AST 进行各种验证、变化,生成逻辑计划(Planing)
  3. 对逻辑计划进行基于规则的优化,生成物理计划(Optimizing)
  4. 对物理计划进行基于代价的优化,生成执行器(Executor)
  5. 运行执行器(Executing)

由于 TiDB 的数据在存储层 TiKV 中,在步骤 5 ,如果将所涉及到的所有 TiKV 数据全部放到 TiDB 层进行执行,会有以下问题:

  1. 存储层(TiKV)到计算层(TiDB)过大的网络开销。
  2. 计算层过多的数据计算对 CPU 的耗费。

为了解决这个问题,并充分利用 TiKV 层的分布式特性,PingCAP 在 TiKV 层增加了 Coprocessor ,即在 TiKV 层读取数据后进行计算的模块。在执行 SQL (主要是读取)时,将部分物理计划(即部分算子组成的 DAG)整个下推到 TiKV 层,由 Coprocessor 执行。

coprocessor.png

Executors

从 TiKV 的 protobuf 接口定义中可以看出,当期 TiKV 支持的 Coprocessor 算子(TiKV 中又称 Executor)类型有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
enum ExecType {
TypeTableScan = 0;
TypeIndexScan = 1;
TypeSelection = 2;
TypeAggregation = 3;
TypeTopN = 4;
TypeLimit = 5;
TypeStreamAgg = 6;
TypeJoin = 7;
TypeKill = 8;
TypeExchangeSender = 9;
TypeExchangeReceiver = 10;
TypeProjection = 11;
TypePartitionTableScan = 12;
TypeSort = 13;
TypeWindow = 14;
}

单个 Coprocessor

Coprocessor 接受一个由 Executor 作为节点组成的 DAGRequest,利用向量化模型

  1. 扫描指定数据
  2. 以 Chunk 为单位依次执行所有算子
  3. 将结果返回到 TiDB 层

小结

CRDB 和 TiDB 在执行 SQL 时最大的区别在于:

  1. CRDB 使用类似 MapReduce 的 MPP 模型,因此多台存储节点间需要通信互相传输数据。
  2. TiDB 中是存储计算分离,将能下推的计算以 DAG 的形式尽可能的下推,而需要多个节点合并计算只能在计算层做,因此多台存储节点间不需要通信以传输数据。

粗浅理解,有写的不对或需要补充之处,欢迎留言。



我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg