The previous post gathered some miscellaneous small classes together into one article. This post intends to read through the relatively long class DataNode.
Each DataNode represents a data node, corresponding to a folder on a machine. Essentially, it is a collection of a certain number of Blocks, capable of communicating with NameNode, clients, and other DataNodes to operate on this Block collection, mainly including client reads and writes, replication of blocks from other DataNodes, and responding to NameNode operations such as deletion.
Specifically, in terms of implementation: in data structure, it maintains a table mapping blocks to byte arrays; at runtime, the DataNode internally runs an infinite loop, continuously querying NameNode, reporting status (heartbeats), and executing commands (RPC).
- Status information. [
DataNodeInfo](/hadoop-source-DFS#datanode-info): total size, remaining size, last update time. - Execute commands.
- Client reads and writes Blocks
- Have other DataNodes replicate Blocks
- Delete certain Blocks
In addition, DataNode also maintains a Server Socket to handle requests from Clients or other DataNodes. DataNode will submit its externally exposed host:port to NameNode, which will then forward this information to other relevant DataNodes or clients.
(Excerpted from class comments)
Author: Muniao’s Notes https://www.qtmuniao.com, please credit the original source when reposting
StartUp
When DataNode starts up, it mainly does the following:
For each dfs.data.dir, it instantiates a DataNode. DataNode has the following important fields:
namenode, of typeDatanodeProtocol, used for RPC communication with NameNode.data, of typeFSDataset, corresponding to a folder, responsible for specific local disk operations.localName, machine name + port, the externally exposed network machine name and port.dataXceiveServer, a socket Server, listening on the aforementioned port, handling read and write requests.- Some other configuration fields, including
blockReportInterval,datanodeStartupPeriod, etc.
Main Loop When Run
offerService, this function decides whether to execute an action again based on the time difference between the current time and the last action time (DataNode to NameNode RPC); these actions basically correspond to the various functions of DataNodeProtocol, i.e., the RPC action conventions. These events are to NameNode:
- Send heartbeat information
- Upload
blockinformation - Fetch
NameNodecommands
Below, each item is explained in detail:
1. Send Heartbeat Information
Heartbeat information includes the following items:
- DataNode name
- DataNode data transfer port
- DataNode total capacity
- DataNode remaining bytes
2. Upload Current Block Information
Reports all Block information of this DataNode to update the machine->block list table and the block->machine list table. Using TreeMap implementation, it can obtain an array sorted by BlockId. By comparing each element of the old and new reported Block arrays (oldReport and newReport) one by one, it updates the old array to the new array using removeStoredBlock and addStoredBlock.
Then NameNode returns the array of Blocks to be deleted, and deletion is performed using the data (FSDataSet) handle.
3. Report Newly Received Block Information, i.e., ReceivedBlock
When a Client writes data, or another DataNode replicates data to the current DataNode, this DataNode executes this function via RPC. Then NameNode updates it into the table that stores metadata.
4. Fetch NameNode Commands
According to the fields of the BlockCommand class:
1 | boolean transferBlocks = false; |
It can be seen that command actions include transfer and delete (or invalidate); action objects include a series of blocks and DataNodes, indicating that blocks[i] should be transferred to the DataNodes targets[i][0] … targets[i][j].
The specific transfer implementation starts a thread for each !invalid block, responsible for the specific data transfer. The code is:
1 | new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start(); |
The DataTransfer class will be annotated in detail later.
DataTransfer
This class implements the Runnable interface and is started each time data needs to be transferred; its main actions are:
- Connect to the first Target DataNode’s socket as output.
- Obtain Block metadata and the data file corresponding to this block on the local machine from
FSDataSet, as input. - Read data from the input end and write it to the output end.
Therefore, this class is only responsible for writing block information to the first target DataNode, say Node1; the rest will be handled by threads on Node1’s machine for data transfer.
The actual folder path and file name of this block on the local machine can both be determined based on blockId. For a 64-bit blockId, every four bits from high to low is used as a folder name (0~15) for routing; therefore, the actual file location depth can be as high as 64/4=16 layers; the file naming convention for stored data is blk_{blockId}.
DataXceiveServer
This class also implements the Runnable interface and is started during DataNode initialization, used to listen for requests from Clients or other DataNodes to perform block data transfer.
The specific implementation uses SocketServer to loop listening for all requests based on the shouldListen signal. When a request arrives, the DataXceiver class is used to handle the specific connection;
DataXceiver
This class is responsible for the specific implementation of data transfer logic, including Block writes and reads. Each time it transfers one Block, it first writes this Block to the local file system, then transfers it to the next target DataNode; specifically,
First, open the socket input stream, read the first byte, and determine the operation type;
Then, perform write or read operations.
Write Operation (OP_WRITE_BLOCK)
-
Read in the header, including the following fields
1
2
3
4
5
6a. shouldReportBlock --> bool
b. block info(blkid+len) --> Block
c. numTargets --> int
d. targets --> DatanodeInfo[]
e. encodingType --> byte
f. data length --> long -
Then write these header information to the next DataNode (
target[1]), after removing the information of the current DataNode (targets[0]). -
Read the specifically stored data from the socket, and write it sequentially to local storage (current DataNode) and the next DataNode’s socket. There is a design point here: if writing to the Socket fails, the Socket can be terminated, but writing to local storage still continues.
-
Depending on the
encodingType, the data reading method differs: for theRUNLENGTH_ENCODINGtype, its structure is length (say l) + data (of length l), so it ends after one read; while for theCHUNKED_ENCODINGtype, the structure is l1 + data1 + l2 + data2 + … + ln + datan + l(n+1) (=0); therefore it needs to continue reading the length in a loop, then read that length of data, until len=0. -
If the socket to the next DataNode is still normal, read back some feedback about the write data from that socket, including a long-type terminator and
LocatedBlock--> the network location of the block after successful writing, which is a pair ofBlockandDatanodeInfo[], representing the Block and the list of DataNodes that have successfully written. The entire write operation and replication process is similar to a recursive call: client writes to datanode1, then datanode1 writes to datanode2, then datanode2 writes to datanode3; then datanode3 sends the write success signal and datanode3’s location back to datanode2, then datanode2 sends the write success signal and datanode2, datanode3 locations back to datanode1, and so on.
Read Operation (OP_READ_BLOCK || OP_READSKIP_BLOCK)
First read in the Block information to be read, then, if it is the OP_READSKIP_BLOCK type, read the number of bytes to skip (toSkip --> long);
Then locate the local storage file position of the block through data --> FSDataSet, decide whether to skip specific bytes (toSkip) based on the type, and then read the file byte by byte.
Aside Info
If a class needs to be used as a Key, such as in TreeMap, it needs to implement the Comparable interface; only if it can be compared can it be sorted and hashed; if serialization and deserialization are needed, it needs to implement the Writable interface.
