木鸟杂记

分布式系统,数据库,存储

Hadoop 源码阅读之DFS(二):DataNode

上一篇把一些零碎的小类集在一起,凑成一篇。这篇打算对比较长的一个类DataNode读读。
每个DataNode代表一个数据节点,对应某台机器的一个文件夹,本质上是一定数量的Block的集合,能够和NameNode,client以及其他DataNode进行通信,以对该Block集合进行操作,主要包括client的读和写,其他DataNode block的复制,以及响应NameNode操作,进行删除等操作。
具体实现来说,数据结构上,维持了一个block到byte array的表;执行时,DataNode内部是一个无限循环,不断询问NameNode,报告状态(心跳),执行命令(RPC)

  1. 状态信息。[DataNodeInfo](/hadoop-source-DFS#datanode-info):总大小,剩余大小,上次更新时间。
  2. 执行命令。
    • 客户端读写Blocks
    • 让其他DataNode复制Blocks
    • 删除某些Blocks

此外,DataNode还维持着一个Server Socket以处理来自Client或者其他DataNode请求。DataNode会将其对外暴露的host:port提交给NameNode,后者会将该信息进一步下发给相关的其他DataNode或者client。
(摘自类注释)

作者:木鸟杂记 https://www.qtmuniao.com, 转载请注明出处

StartUp

DataNode启动的时候主要干了以下事情:
为每个dfs.data.dir实例化一个DataNode,DataNode有以下几个重要字段:

  1. namenodeDatanodeProtocol类型,和NameNode进行RPC通信。
  2. dataFSDataset类型,对应一个文件夹,负责具体的本地磁盘操作。
  3. localName, machine name + port,对外暴露的网络机器名和端口。
  4. dataXceiveServer,一个socket Server,监听上述端口,处理读写请求。
  5. 其他一些配置字段,包括blockReportIntervaldatanodeStartupPeriod等。

Main Loop When Run

offerService,该函数根据当前时间与上次动作时间差值,决定是否再一次执行该动作(DataNodeNameNode的RPC);这几个动作基本对应DataNodeProtocol的各个函数,即RPC的几个动作约定。这些事件有向NameNode

  • 发送心跳信息
  • 上传block信息
  • 获取NameNode指令

下面分别就每一项进行详细说明:

1. 发送心跳信息

心跳信息包括以下几项内容:

  • DataNode名字
  • DataNode数据传输端口
  • DataNode总容量
  • DataNode剩余字节数

2. 上传当前Block信息

报告本DataNode的所有Block信息,以更新表machine->block list 和表block->machine list。利用TreeMap实现,能得到按BlockId排序的数组,通过逐一比较新旧上报Block数组的每个元素(oldReportnewReport),利用removeStoredBlockaddStoredBlock将旧数组更新为新数组。

然后NameNode将需要删除的Block数组返回,利用dataFSDataSet)句柄进行删除。

3. 报告新收到的Block信息,即ReceivedBlock

当Client写数据,或者其他DataNode复制数据给当前DataNode的时候,该DataNode通过RPC,执行此函数。然后NameNode将其更新到保存元数据的table里。

4. 获取 NameNode指令

根据BlockCommand类的字段:

1
2
3
4
boolean transferBlocks = false;
boolean invalidateBlocks = false;
Block blocks[];
DatanodeInfo targets[][];

可以看出,指令动作包括交换(transfer)和删除(delete or invalidate);动作对象包括一系列blocks和DataNode,表示将blocks[i]传送到targets[i][0] targets[i][j]的DataNode上去。
具体传送实现,为每一个!invalid的block,启动一个线程,负责具体数据传送,代码为:

1
new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();

后面将对DataTransfer类进行详细注解。

DataTransfer

该类实现了Runnable接口,在每次有数据需要传输时被启动;其动作主要为:

  1. 连接第一个Target DataNode的socket,作为输出。
  2. FSDataSet中获取Block元信息以及本机器上该block对应的数据文件,作为输入。
  3. 从输入端读取数据,写到输出端。

因此,该类只负责将block信息写到第一个target DataNode,比如说Node1,剩下的将由Node1机器上的线程进行数据传送。

该block在本机实际的文件夹路径和文件名都可以根据blockId进行确定。对于一个64bit的blockId,从高位到地位,每四位作为一个文件夹的名字(0~15),进行路由,因此文件实际位置的深度可能高达64/4=16层;存储数据的文件命名方式为blk_{blockId}.

DataXceiveServer

该类也实现了Runnable接口,在DataNode初始化的时候被启动,用于监听Client或者其他DataNodes的请求,以进行block数据的传输。
具体实现为,使用SocketServer,根据信号shouldListen来循环监听所有请求。当请求到来时,使用DataXceiver类进行具体连接的处理;

DataXceiver

该类负责具体实现数据传输的逻辑,包括Block的写和读,每次传输一个Block块,将该Block首先写入本地文件系统,然后传送给下一个目标DataNode;具体来说,
首先,打开socket输入流,读取首字节,判断操作类型;
然后,进行写或者读操作。

写操作(OP_WRITE_BLOCK)

  1. 读入header,包括以下几个字段

    1
    2
    3
    4
    5
    6
    a. shouldReportBlock --> bool
    b. block info(blkid+len) --> Block
    c. numTargets --> int
    d. targets --> DatanodeInfo[]
    e. encodingType --> byte
    f. data length --> long
  2. 然后将这些header信息,去掉该DataNode(targets[0])的信息后,写入下一个DataNode (target[1])。

  3. 从socket中读取具体存储的数据,先后写入本地存储(当前DataNode)和下一个DataNode的socket。这里有一点设计,就是如果写Socket异常后,可以终止Socket,但仍然继续写本地存储。

  4. encodingType的类型不同,读取数据方式不同:对于RUNLENGTH_ENCODING类型,其结构是length(say l)+data(of the length l),因此读一次就结束;而CHUNKED_ENCODING类型,结构为l1 + data1 + l2 + data2 + … + ln + datan + **l(n+1) (=0)**;因此需要循环继续读如长度,然后读入该长度数据,直到len=0结束。

  5. 如果和下一个DataNode间的socket仍然正常,则从该socket读回一些关于写数据的反馈,包括long型的结束符和LocatedBlock–>写成功后的block网络位置,是一个BlockDatanodeInfo[]对,表示该Block以及已经写成功的DataNode list。整个写操作和备份的过程类似于一个递归调用的过程,由client写datanode1, 然后datanode1写datanode2,然后datanode2写datanode3;然后datanode3将写成功信号,以及datanode3位置告诉datanode2,然后datanode2将写成功信号以及datanode2,datanode3位置告诉datanode1等等。

读操作(OP_READ_BLOCK || OP_READSKIP_BLOCK

首先读入待读取的Block信息,然后,如果是OP_READSKIP_BLOCK类型,则读取需要跳过的字节数(toSkip–>long);
然后通过data –> FSDataSet 定位block本地存储文件位置,根据类型决定是否跳过特定字节(toSkip),然后逐字节读取该文件。

Aside info

如果类需要作为Key,比如TreeMap,则需要实现Comparable接口,只有可以比较才能进行排序和Hash;如果需要进行序列化和反序列化,则需要实现Writable接口。


我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg