在云上进行大规模数据处理的一些实践

随着云基础设施的不断成熟,新兴的公司为了快速实现业务目标,一般都会让基础设施上云。而在云上进行开发与传统上直接使用物理机开发其实有很大不同。云上更强调共享弹性,此外,规模变大又会带来隔离性。这些改变也倒逼我们在进行开发时做出一些改变。在云上进行大规模数据处理,我主要有一些 spark 和 ray 的经验,使用的语言主要是 python;从这些技术栈出发,谈谈一些还算行之有效开发实践。

使用 ray 在云上进行大规模数据处理,一个基本的思路是:构建最小可并行单元,进行功能测试和性能测试,然后再利用 ray.data (比如 mapmap_batches )进行 scale。使用 spark 时,会稍有不同;相比 ray,spark 虽然灵活性稍差一些,但抽象封装更好,可以从数据集整体的角度来考虑数据处理,spark 会通过你设置的分区数和并行度,自动地扩展和容错。

作者:木鸟杂记 https://www.qtmuniao.com/2025/06/14/data-processing-on-cloud/ 转载请注明出处

共享性

共享性可以大致分为两方面:一方面是多个开发机的环境共享,另一方面是开发环境和生产环境的共享。那为什么会有多个开发机共享的需求呢?一来,我们总会因为各种原因,有开发机迁移的需求;二来,可能会有 CPU 机器和 GPU 机器的区分;三来,同事间、开发机和生产机偶尔也想通过文件系统来共享一些东西。

如果换机器进行开发,大家多少都会遇到过一些痛点:辛辛苦苦的在一台机器配好的环境,换个环境又要再来一次。当然,有的同学可能会把这个过程固化成一个脚本,每次启动时执行一遍。但即使这个过程能够被标准化和自动化,但每次装机也是需要花一些时间的,不能做到开箱即用。

就像 Java 的 slogan 所谓“一次编译、到处运行”一样,我们在配置开发机环境是,能不能达到“一次配置,随便换机”的效果呢?下面分享一种很粗糙的实践。

为了实现多机共享,可以向云厂商申请一块很大的支持 POSIX 语义、多点读写的共享云盘。然后将所有用户目录放到该云盘上,等有新的开发机时直接挂载该盘,然后将你的用户目录软连到本机用户根目录(/home )下即可。这样解决了数据共享问题。那么进一步,账号系统能不能也共享呢?如果多机的账号不通,则意味着需要给同样的数据目录给多机的多个账号进行不同授权。授权其实还好,但 owner 只能有一个,给了这个机器的,就不能给另外机器的账号。当然,这个问题的本质在于不同机器的账号 id (uid 和 gid)可能会产生冲突。于是为了真正丝滑的进行多机用户目录共享,我们还得配套地进行账号系统共享。

研究下 Linux 下用户系统可以发现,所有用户信息的最小集落在两个文件中:

  1. 用户基本信息:/etc/passwd
  2. 用户组信息:/etc/group

只要让所有机器共享这两个文件,即可让所有机器共享账号系统。但问题又来了:这个信息在哪里生成?如果有用户恶意改动其内容怎么办?

第一个问题,参考分布式系统中的常见操作,可以让一个开发机充当 “master”,所有用户都由该机器来创建,其他机器称为 “follower”,来挂载这两个文件,替换系统原用户信息文件。由唯一的 master 生成的 uid/gid 从而避免账号体系的冲突。

第二个问题,由组内管理员来控制账号的统一创建。但又不能不给其他用户 sudo 权限(不然在本机安装东西会很麻烦),所以随意修改这两个文件权限还是控不住,这个就只能大家口头约定了。

这样,就初步达成了我们“一次配置,随意换机”的目标。

但在这个体系下,还有一些实际问题需要解决,比如尽量使用类似 conda 的工具,将所有的环境和依赖安装到自己的用户目录下,而非每台机器的系统盘。这个我们在隔离性的一小节再详细展开。

弹性

这也是云最大的卖点之一:按需弹量、按量付费,但现实总没有这么美好。

成本

从成本上说,弹性资源的总会比包年包月的贵很多。因此作为云厂商用户的我们,通常会买一些包年包月的机器来满足大部分场景需求,然后在偶尔不够用时临时使用弹性资源。但如果你的负载不收敛,又会面临两难:贴近需求上限来保持常备池子,会有很多闲置浪费;贴近需求下限来购置常备池子,则需要经常使用弹性资源,也会比较贵。如果你的资源用量比较稳定,就可以稳定的省钱;但是如果比较多变,对不起,云上的弹性只解决有无问题,不会解决成本问题。常态化使用时,为了降本增效,你还是得精确计算和管控你的资源用量。

如果机器数量比较大,通常我们会用容器技术来池化所有机器资源,并使用 k8s 来编排调度任务。得益于 k8s 的开放性,现在大部分的计算框架都可以通过定制 Operator,一键部署在 k8s 上,比如 kuberayspark on k8s 。这种方式可以更好的对资源进行池化和分配。于是就引出了另外一个经典问题——调度。在调度的范畴里,我们可以按紧急程度用量多少来考虑每个待调度任务的属性。下面说几个使用场景:

  1. 插队:假设现在资源已经耗尽,有很多任务在排队,但新来了一个比较紧急的任务,要想快速获得资源,就得使用较高的优先级,插到别人前面去,等有资源可用立即抢上。甚至进一步,如果需求再紧急一点,可以使用高优先级+抢占式调度,直接从低优先级任务那里抢资源。当然更好的方式是手工介入,主动杀死一些(不用全杀)不着急任务的 pod 以临时出让资源。因此,在分布式系统中,对子任务换机器重试是刚需。
  2. 死锁:调度也有死锁?对的,考虑如下场景:现在集群中还剩 100 cpu,有两个 ray 任务,任务 A 需要 80 核,任务 B 需要 60 核。任何单一任务调度到集群中,都能起来,但是如果任务 A 抢到了 70 核,任务 B 抢到了 30 核,则谁都不能起来。这时就需要引入组调度(gang scheduling,比如 volcano),即只有一个任务所需资源都被满足后才一次性地给其分配资源,就跟锁一样。有了这种保证,要么 A 被调度上去、要么 B 被调度上去,而不会发生上面类似死锁的状况。
  3. 弹性:使用 k8s 跑计算框架时,通常会在任务级别进行弹性,如果池子中资源多,就多用点;资源少,就并发度低的跑慢点,spark 在这方面做得很好。而kuberay 的弹性调度,现在就是个笑话,且看以后把。但 spark 弹性做的好带来一个问题,就是如果使用者为一个大任务设定了很多资源,且没有优先级时,会很容易把整个池子吃满。

因此,在云上进行大任务调度,一个支持 gang scheduling、优先级调度,且调度效率较高(比如 spark 一下拉起了上万个 pod 也不能拉胯)的任务调度器必不可少。

代码

从代码角度来看,对程序进行大规模扩展也不是没有代价的。当然,这个代价在变的越来越低,上古时代,有 Hadoop 的 MapReduce;后面要求低延迟和易用性,又有 Spark、Flink;在机器学习时代,Ray 则大行其道。Ray 在理念上,很像一个大型的分布式计算机,采用经典的 master-worker 架构,将所有内存收集起来提供 object 级别的 kv 抽象,称为 object store;将所有 GPU/CPU 收集起来提供并发执行的基础,支持小数级别的逻辑分配。

ray 支持非常细粒度的并行,灵活性拉满,因此满足了大模型时代复杂多变的数据处理需求。但代价就是,其他分布式系统中一些常见的高阶抽象,到 ray 中就得自己做了:

  1. 逻辑调度。ray 使用基于可量化的 label 进行逻辑调度(感兴趣可以参考这篇博客)。逻辑和物理间的 gap,就需要你自己调优去填平。举例来说,如果你的 actor/worker 在逻辑上声明了要使用 10G 内存,但物理上的需求远超 10G,那么多个这样的 worker 调度到一台机器上,就很容易把该机器内存打爆。cpu/gpu 超用虽然不至于会把机器打爆,但逻辑和物理不统一,要么造成浪费,要么争抢严重。
  2. 上下游协同。ray 为了更好的支持数据处理,在 ray core 之上,包出了 ray.data 库,但如果上下算子的生产-消费速度控不好,就很可能造成:或者上游堆积,逐渐把机器内存撑爆;或者下游闲置,造成资源浪费。
  3. 数据条目非标。在进行大规模数据处理的时候,总会在处理了很久之后,发现一两条异常数据将任务干死。如果不做合适的错误处理或者断点续做,你将得到惨痛的代价。当然,ray 提供了静默错误的配置,但代价是守恒的——这个屎你不吃,下游同学就要吃。

调试

在大规模并行系统中,出错是非常常见的;但复现错误却通常很难——数据量过于巨大、环境过于复杂、复现过于费时等等。为了在出错时快速定位,你需要构建一套可观测系统:

  1. 日志收集。对所有关键路径上的步骤进行日志输出,将整个处理代码通过日志切分成一个个“格子”,从而在出错时能快速定位到相关代码段
  2. 指标统计。数据处理通常分为多个环节,出现 OOM 时,很可能是某个环节数据堆积,把机器打爆了。如果能对一些关键处理指标进行统计,就可以在出现性能问题时迅速找到原因。当然,指标统计可分为应用层面和系统层面,上面说的是用户代码层的应用层面,对于系统层面来说,往下探一层,比如 ray actor 的一些随时间的资源用量;再往下探一层,比如每个节点随时间的资源用量和网络 IO;往外扩一层,所依赖的存储系统,比如对象存储的按 bucket/prefix 的读写流量等等。

有了这些日志和指标,在出现问题时,可以很方便的根据日志获取出错时间,然后进行时间回溯,查看当时各种指标。收集到足够信息后,简单的就可以直接定位;复杂一些的,如果打印出了数据条目,可以利用该条数据进行最小复现,同时将该复现沉淀为单元测试。

另外一种常见的需要调试的问题——性能。这可能有多方面的原因:

  1. 最小执行单元性能就不太行
  2. 扩展时资源分配不合理造成踩踏
  3. 上下游生产消费速度不匹配

后面两个其实是分布式系统中常见问题,成熟的计算平台能在框架层面就解决这些问题(背压),但这多少也会让灵活性变差。灵活性和易用性总是需要取舍的两端,而 ray 由于设计理念和不太完善的原因,导致我们必须手动来处理这些问题。

另外,如果有不明原因的变慢甚至 hang 住,可以直接登录机器,通过一些 linux 命令行小工具,来查看系统资源用量;通过 py-spy 等工具来查看代码到底在做啥;从而查看系统到底出了什么问题,hang 在了哪里。

隔离性

从生命周期来入手,主要可以分为开发阶段和运行阶段的隔离性。

开发阶段

如之前所述,在开发阶段,会有多个开发同学共享一个开发机的情况,有共享就会引出隔离性的需求。最自然的解决方法,就是尽量利用 Linux 本身的账户体系对开发机进行隔离。有些团队为了图方便一把梭地都用 root ,由于每个人都有自己的开发习惯,后面很容易在安装工具和依赖的时候出现版本冲突。因此,还是建议大家都不用 root,并且把所有的软件尽量用类似 conda 工具安装在用户自身的目录中,一来可以在不同机器上方便迁移,二来可以避免影响同机器上的其他用户,也就是现在说的隔离性。

另外,对于单个用户来说来常常面临多个环境冲突的情况。以 Python 开发为例,如果你将所有的项目依赖都安装到用户目录,只要项目多,起冲突是必然的事情。即使同一个项目,在进行多分支开发时,偶尔也会有冲突。因此也需要用 virtualenv 或 conda 等工具来进行依赖的隔离。

运行阶段

有容器化技术的帮助,当今将一批物理机池化、并按需即时隔离出一个逻辑运行环境可太容易了。此外,利用容器技术和一些依赖管理工具链(比如 poetry 和 uv),我们再也不用担心过去开发环境和生产环境经常出现依赖不一致的情况了。即:

  1. 使用同一份 pyproject.toml文件来管理项目开发依赖
  2. 在进行开发时,使用 poetry/uv 和该文件来安装依赖
  3. 在线上运行时,在 Dockerfile 中利用 poetry/uv 来进行依赖管理

当然,上面只覆盖到了 python 的依赖,肯定还会有其他类似 cuda 等操作系统层面的库依赖。但原理是相通的,即使用 Dockerfile 来作为项目依赖的唯一的 source of truth,无论开发环境还是打镜像,都用该 Dockerfile 中的命令。

更激进的,甚至可以直接用项目中的 dockerfile 临时拉起一个环境进行开发,这样就让开发和运行达到了完全统一。也可从另外一个角度思考这个问题:

  1. 在开发时,镜像应该是需要被改写
  2. 但在生产时,镜像是只读的

所以在开发和运行时进行依赖管理,还是稍微有些不一样的。

小结

在云环境中进行开发改变了一些范式,从而诞生了很多新的开发实践。本文只是依据作者的经验,以在云上进行大规模的数据处理为切入点,稍微梳理了其中一些常见的问题和实践,如果能稍微给你一些启发,目的就达到了。限于经验和篇幅,肯定有覆盖不到之处,欢迎大家留言讨论和补充。


我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg