与读流程一样,HDFS 写流程也分为 Client 、 Namenode 和 Datanode 三端的子流程。
先看一段简单的客户端写文件代码:
FileSystem fs = FileSystem.get(new URI("hdfs://ip:port"), new Configuration());
FSDataOutputStream dataOut = fs.create(new Path("/test/data.file"));
dataOut.write(new byte[4609]); // 一种写法
dataOut.writeBytes("Test write data"); // 另一种写法
dataOut.close(); // close 中包含了 flush
与 POSIX 文件系统类似,HDFS 文件的写数流程是 create/append -> write -> close 。
FileSystem.get(URI uri, Configuration conf) 的实现中利用了一个 CACHE ,即如果此前已经构造了 FS ,则记录到 CACHE 中,以便于后续直接获取。
当然也可以通过开关 fs.$scheme.impl.disable.cache
关闭掉 CACHE ,这样每次 get 都会构造一个新的 FS 。
输入是 Path,输出是 FSDataOutputStream 。
核心实现在 DFSClient 类中,主要包含两个操作:
FSDataOutputStream 继承自 DataOutputStream ,主要维护了写相关的信息,封装了三个主要接口 write, flush 和 close ,其中也包含数据流处理器 DataStreamer 的构造和启动。
构造 FSDataOutputStream 的首要参数就是文件,对应的是 HdfsFileStatus 类,通过 RPC dfsClient.namenode.create(...)
从 Namenode 获取到。
租约在另一篇讲述。
write 接口有多个,一种典型的 write 输入为:(final byte buf[], int off, int len)。
先看下正常写的调用栈:
writeData:110, DFSPacket (org.apache.hadoop.hdfs)
writeChunk:451, DFSOutputStream (org.apache.hadoop.hdfs)
writeChecksumChunks:217, FSOutputSummer (org.apache.hadoop.fs)
flushBuffer:164, FSOutputSummer (org.apache.hadoop.fs)
flushBuffer:145, FSOutputSummer (org.apache.hadoop.fs)
write1:136, FSOutputSummer (org.apache.hadoop.fs)
write:111, FSOutputSummer (org.apache.hadoop.fs)
write:57, FSDataOutputStream$PositionCache (org.apache.hadoop.fs)
write:107, DataOutputStream (java.io)
可以看到由于写入的内容包含实际数据和校验数据,所以实际的 output 类是 FSOutputSummer 。
DFSPacket 封装了网络传输的数据缓冲区,用户的要写入的数据会被按指定大小切分放入此 packet 对象中,然后由 DataStreamer 发送给 Datanode。
DFSPacket 的 writeData 和 writeChecksum 两个方法,分别用于将用户的数据和校验数据拷贝到 packet 对象的内部 buf 。
当 DFSPacket 的内部 buf 填满后,调用 enqueueCurrentPacket()
方法将此 packet 放入 DataStreamer 的处理队列中。
一个 packet 包的实际数据量大小由参数 dfs.client-write-packet-size
指定,然后经过 computePacketChunkSize(int psize, int csize)
方法计算得到。
再回顾一下数据的组织形式:
file(用户文件) -> block(管理单位) -> packet(传输单位) -> trunk(校验单位)
至此,write 的核心内容结束。
flush 操作要把数据真正的写入到 Datanode ,主要是在 DataStreamer 中完成的。
DataStreamer 包含两个线程,一个用于发送数据,另一个用于接收回执。
相对应的,有两个队列,一个是发送数据的队列 dataQueue ,另一个是回执队列 ackQueue ,两个队列挂的都是 packet 。
在发送数据之前,客户端需要知道发往哪些 Datanode ,调用栈如下:
addBlock:1110, DFSOutputStream (org.apache.hadoop.hdfs)
locateFollowingBlock:2005, DataStreamer (org.apache.hadoop.hdfs)
nextBlockOutputStream:1803, DataStreamer (org.apache.hadoop.hdfs)
run:748, DataStreamer (org.apache.hadoop.hdfs)
在 addBlock 中,通过 RPC dfsClient.namenode.addBlock(...)
从 Namenode 中获取当前数据对应的数据块的所有 Datanode 。
有了所有的 Datanode ,然后调用 createSocketForPipeline(...)
方法创建发送数据的流水线,具体参考 pipeline 篇。
构造好了 pipeline , 就可以正式写入数据了,参考调用栈:
write:456, SocketChannelImpl (sun.nio.ch)
performIO:63, SocketOutputStream$Writer (org.apache.hadoop.net)
doIO:144, SocketIOWithTimeout (org.apache.hadoop.net)
write:159, SocketOutputStream (org.apache.hadoop.net)
write:117, SocketOutputStream (org.apache.hadoop.net)
write:122, BufferedOutputStream (java.io)
write:107, DataOutputStream (java.io)
writeTo:193, DFSPacket (org.apache.hadoop.hdfs)
run:807, DataStreamer (org.apache.hadoop.hdfs)
需要注意的是,当前块的所有 packet 发送完之后,还会发送一个空 packet 做为结尾标志。然后在所有回执收到后,通过 endBlock()
关闭当前块的输出流,回执线程和 pipeline 等。
待一个文件的所有块都写完后,调用 closeInternal()
关闭当前块的输出流,回执线程、 pipeline 和释放 packet buf 等。
实际上在上面 write 的流程中,被填满的 packet 会被挂到发送队列,而数据块末尾的未填满的 packet 在 close 流程中会被挂到发送队列,因此 flush 的调用经常会被省略。
close 操作中主要做三件事:
flushInternalWithoutWaitingAck()
发送最后一个 packet 。dfsClient.namenode.complete(...)
完成,参考调用栈:
completeFile:978, DFSOutputStream (org.apache.hadoop.hdfs)
completeFile:934, DFSOutputStream (org.apache.hadoop.hdfs)
closeImpl:917, DFSOutputStream (org.apache.hadoop.hdfs)
close:872, DFSOutputStream (org.apache.hadoop.hdfs)
close:72, FSDataOutputStream$PositionCache (org.apache.hadoop.fs)
close:101, FSDataOutputStream (org.apache.hadoop.fs)
endFileLease(fileId)
。至此,客户端返回写入成功。
Namenode 的写入流程涉及与客户端的几个 RPC 交互和与 Datanode 的块汇报交互。
create 的操作对象是文件,创建指定文件的 inode ,然后加入到目录树中,此外还有维护租约等操作。先看下 create 的调用栈:
addFile:635, FSDirWriteFileOp (org.apache.hadoop.hdfs.server.namenode)
startFile:459, FSDirWriteFileOp (org.apache.hadoop.hdfs.server.namenode)
startFileInt:2815, FSNamesystem (org.apache.hadoop.hdfs.server.namenode)
startFileInt:1313, JDFSNamesystem (org.apache.hadoop.hdfs.server.namenode)
startFile:2714, FSNamesystem (org.apache.hadoop.hdfs.server.namenode)
create:802, NameNodeRpcServer (org.apache.hadoop.hdfs.server.namenode)
addFile 中即完成了新建 inode 并加入目录树的操作。
对于覆盖写的情况,实际上是通过先执行 delete ,然后新写的方式实现的。
append 的操作与 create 不同,因为指定文件是已经存在的,要做的是在文件末尾追加写入数据。因此其操作对象是文件的最后一个数据块和相关的租约恢复。
此时需要总结一下数据块的几个状态:
对于 append ,简单来说就是要把末尾的 block 改为 UNDER_CONSTRUCTION ,调用的方法是fsd.getBlockManager().convertLastBlockToUnderConstruction(...)
。
如果没有 block ,或者末尾 block 已经满了,则走新块逻辑。
addBlock 分为三步:
FSDirWriteFileOp.chooseTargetForNewBlock(...)
,核心规则是第一个节点以和客户端距离最近优先,第二个节点再考虑比如跨机架等因素选择,此外根据实际应用场景,还要考虑可用容量,节点繁忙程度等等。getAdditionalBlockInt(...)
> FSDirWriteFileOp.storeAllocatedBlock(...)
> fsn.createNewBlock(blockType)
> saveAllocatedBlock(...)
> addBlock(...)
> blockInfo.convertToBlockUnderConstruction(...)
> fsd.getBlockManager().addBlockCollection(blockInfo, fileINode)
> fileINode.addBlock(blockInfo)
> return makeLocatedBlock(...)
complete 的调用代表客户端通知 Namenode 文件写完了,然后 Namenode 需要对文件和数据块做一些状态转换。主要在方法 completeFileInternal(...)
中实现,调用关系如下:
completeFileInternal(...)
> checkLease(...) // 如果租约过期但文件已经完成,则视为完成
> commitOrCompleteLastBlock(...) // 对末尾块状态转换
> commitBlock(...) // 转为 COMMITTED
> completeBlock() // 副本数达标则转为 COMPLETE
> updateNeededReconstructions(...) // 副本数不全则放入待补队列
> addCommittedBlocksToPending(...) // 等待副本汇报
> finalizeINodeFileUnderConstruction(...)
> toCompleteFile(...) // 转换文件状态
> removeLease(...) // 删除租约
此时 Namenode 已经被客户端通知文件写入完成,终于等来了 Datanode 的块副本汇报。
Namenode 在处理 IBR 时,需要更新数据块和节点的关系,转换数据块和文件状态等。
IBR 调用栈参考如下:
addStoredBlock:4357, BlockManager (org.apache.hadoop.hdfs.server.blockmanagement)
processAndHandleReportedBlock:5323, BlockManager (org.apache.hadoop.hdfs.server.blockmanagement)
addBlock:5296, BlockManager (org.apache.hadoop.hdfs.server.blockmanagement)
processIncrementalBlockReport:5396, BlockManager (org.apache.hadoop.hdfs.server.blockmanagement)
processIncrementalBlockReport:5363, BlockManager (org.apache.hadoop.hdfs.server.blockmanagement)
processIncrementalBlockReport:5480, FSNamesystem (org.apache.hadoop.hdfs.server.namenode)
Datanode 的写入流程可以划分为接收数据、数据落盘和副本汇报三个部分。
数据的接收和流转由 BlockReceiver 类负责,其对象在 DataXceiver 中构造,数据包接转代码如下:
DataXceiver.writeBlock(...)
> blockReceiver.receiveBlock(...)
> new PacketResponder(...) // 如果是第一个节点,则启动一个线程向客户端发送回执
> while (receivePacket() >= 0)
receivePacket()
> packetReceiver.receiveNextPacket(in) // 读数据包
> packetReceiver.mirrorPacketTo(mirrorOut); // 发给下游
> streams.writeDataToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk); // 本地固化
数据块在 Datanode 中是以副本的形式管理的,对应 ReplicaInfo 类。
DN 的主要工作都是围绕副本展开的,先总结一下副本的几个状态:
一个数据块副本要具体写入到哪里,是由一串的磁盘数据块管理器决定的:
FsDatasetImpl -> FsVolumeList -> FsVolumeImpl -> BlockPoolSlice
blockChooser.chooseVolume(...)
选择要写入的 Volume 。DatanodeUtil.idToBlockDir(...)
中通过 blockId 计算得到。写入流程是在块接收器 BlockReceiver 中完成的,调用栈参考如下:
write:327, FileOutputStream (java.io)
write:951, FileIoProvider$WrappedFileOutputStream (org.apache.hadoop.hdfs.server.datanode)
writeDataToDisk:148, ReplicaOutputStreams (org.apache.hadoop.hdfs.server.datanode.fsdataset)
receivePacket:749, BlockReceiver (org.apache.hadoop.hdfs.server.datanode)
receiveBlock:1000, BlockReceiver (org.apache.hadoop.hdfs.server.datanode)
writeBlock:946, DataXceiver (org.apache.hadoop.hdfs.server.datanode)
opWriteBlock:197, Receiver (org.apache.hadoop.hdfs.protocol.datatransfer)
processOp:106, Receiver (org.apache.hadoop.hdfs.protocol.datatransfer)
run:300, DataXceiver (org.apache.hadoop.hdfs.server.datanode)
写完成后也有一个回执线程 PacketResponder 负责向上游发送回执,其中一个主要操作,就是当一个块写完成时(收到空尾包),调用 finalizeBlock(…) 来结束当前数据块的写入。
finalizeBlock 有两个主要步骤,一个是转为 FINALIZED 状态,参考调用栈如下:
addFinalizedBlock:1066, FsVolumeImpl (org.apache.hadoop.hdfs.server.datanode.fsdataset.impl)
finalizeReplica:1873, FsDatasetImpl (org.apache.hadoop.hdfs.server.datanode.fsdataset.impl)
finalizeBlock:1833, FsDatasetImpl (org.apache.hadoop.hdfs.server.datanode.fsdataset.impl)
finalizeBlock:1550, BlockReceiver$PacketResponder (org.apache.hadoop.hdfs.server.datanode)
run:1504, BlockReceiver$PacketResponder (org.apache.hadoop.hdfs.server.datanode)
run:748, Thread (java.lang)
另一个是触发增量块汇报给 Namenode,参考调用栈如下:
notifyNamenodeBlock:272, IncrementalBlockReportManager (org.apache.hadoop.hdfs.server.datanode)
notifyNamenodeBlock:333, BPOfferService (org.apache.hadoop.hdfs.server.datanode)
notifyNamenodeReceivedBlock:311, BPOfferService (org.apache.hadoop.hdfs.server.datanode)
notifyNamenodeReceivedBlock:1417, DataNode (org.apache.hadoop.hdfs.server.datanode)
closeBlock:2971, DataNode (org.apache.hadoop.hdfs.server.datanode)
finalizeBlock:1557, BlockReceiver$PacketResponder (org.apache.hadoop.hdfs.server.datanode)
run:1504, BlockReceiver$PacketResponder (org.apache.hadoop.hdfs.server.datanode)
run:748, Thread (java.lang)
在 notifyNamenodeBlock 中,通过调用 addRDBI(rdbi, storage)
和 triggerIBR(isOnTransientStorage)
完成数据块信息的封装和触发汇报。
至此,HDFS 的写流程大致梳理完成。
HDFS 读流程分为 Client 、 Namenode 和 Datanode 三端的子流程。
先看一段简单的客户端读文件代码:
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://ip:port"), conf);
FSDataInputStream dataIn = null;
try {
dataIn = fs.open(new Path("/test/file.data"));
int dataBytes = dataIn.read(); // 一种读法
IOUtils.copyBytes(dataIn, System.out, 1024, false); // 另一种读法
} finally {
IOUtils.closeStream(dataIn);
}
与 POSIX 文件系统类似,HDFS 文件的读取流程也是 open -> read -> close 。
其中 open 的输入是文件路径,输出是 FSDataInputStream 。
FSDataInputStream 继承自 DFSInputStream ,主要维护了读相关的信息,如客户端配置、文件块和缓存策略等。
LocatedBlocks 主要维护了待读文件所对应的所有数据块 LocatedBlock ,包含数据块状态和数据副本所在的 DN 等。通过 RPC 调用从 Namenode 查询而来。
namenode.getBlockLocations(src,start,length,client);
read 的输入有三个:(final byte buf[], int off, int len)。
主要分为如下几步实现:
由于分级被分割为一组数据块,所以要通过输入的偏移找到对应的数据块,其核心是一个二分查找。
Collections.binarySearch(blocks,key,comp);
findBlock:154, LocatedBlocks (org.apache.hadoop.hdfs.protocol)
fetchBlockAt:562, DFSInputStream (org.apache.hadoop.hdfs)
getBlockAt:546, DFSInputStream (org.apache.hadoop.hdfs)
blockSeekTo:681, DFSInputStream (org.apache.hadoop.hdfs)
对于多副本的情况,从 Namenode 查询到的 Block 所有副本所在的 DN 是按照一定顺序排列的(默认按照客户端的网络拓扑距离排序),此时客户端选择第一个 DN 即可。在读取数据包时,如果发生传输异常,则将当前的 DN 通过 addToLocalDeadNodes 方法记为死节点,然后重试选取下一个 DN。这个重试逻辑是在 readWithStrategy 方法中实现的。
<init>:1792, DFSInputStream$DNAddrPair (org.apache.hadoop.hdfs)
getBestNodeDNAddrPair:1141, DFSInputStream (org.apache.hadoop.hdfs)
chooseDataNode:1045, DFSInputStream (org.apache.hadoop.hdfs)
chooseDataNode:1028, DFSInputStream (org.apache.hadoop.hdfs)
blockSeekTo:692, DFSInputStream (org.apache.hadoop.hdfs)
readWithStrategy:900, DFSInputStream (org.apache.hadoop.hdfs)
BlockReader 中包含建立与 DN 的网络链接和数据包接收器。
创建网络链接:
newConnectedPeer:3112, DFSClient (org.apache.hadoop.hdfs)
nextTcpPeer:824, BlockReaderFactory (org.apache.hadoop.hdfs.client.impl)
getRemoteBlockReaderFromTcp:749, BlockReaderFactory (org.apache.hadoop.hdfs.client.impl)
build:382, BlockReaderFactory (org.apache.hadoop.hdfs.client.impl)
getBlockReader:770, DFSInputStream (org.apache.hadoop.hdfs)
数据包接收器中维护了一系列的数据包处理方法,如数据包接受,包头解析分片等。
<init>:78, PacketReceiver (org.apache.hadoop.hdfs.protocol.datatransfer)
<init>:103, BlockReaderRemote (org.apache.hadoop.hdfs.client.impl)
newBlockReader:439, BlockReaderRemote (org.apache.hadoop.hdfs.client.impl)
getRemoteBlockReader:865, BlockReaderFactory (org.apache.hadoop.hdfs.client.impl)
getRemoteBlockReaderFromTcp:752, BlockReaderFactory (org.apache.hadoop.hdfs.client.impl)
build:382, BlockReaderFactory (org.apache.hadoop.hdfs.client.impl)
getBlockReader:770, DFSInputStream (org.apache.hadoop.hdfs)
由于数据包包含了若干数据 chunk 和所对应的 checksum,因此读取和校验操作是前后依次执行的。
具体的读取是在 PacketReceiver 的 doRead 方法中完成的,先读取数据包长度,然后取数据包全部数据,最后通过 reslicePacket 方法将数据包数据分片为 Header、Data 和 Checksum。
read:161, SocketInputStream (org.apache.hadoop.net)
readChannelFully:258, PacketReceiver (org.apache.hadoop.hdfs.protocol.datatransfer)
doReadFully:209, PacketReceiver (org.apache.hadoop.hdfs.protocol.datatransfer)
doRead:171, PacketReceiver (org.apache.hadoop.hdfs.protocol.datatransfer)
receiveNextPacket:102, PacketReceiver (org.apache.hadoop.hdfs.protocol.datatransfer)
readNextPacket:189, BlockReaderRemote (org.apache.hadoop.hdfs.client.impl)
read:148, BlockReaderRemote (org.apache.hadoop.hdfs.client.impl)
readFromBlock:120, ByteArrayStrategy (org.apache.hadoop.hdfs)
readBuffer:842, DFSInputStream (org.apache.hadoop.hdfs)
readWithStrategy:909, DFSInputStream (org.apache.hadoop.hdfs)
read:1016, DFSInputStream (org.apache.hadoop.hdfs)
read:100, DataInputStream (java.io)
一个数据包的结构如下,其中DATA 段为真正数据内容,默认最大长度65536,也可以由 io.file.buffer.size 指定更大长度:
PLEN HLEN HEADER CHECKSUMS DATA
32-bit 16-bit <protobuf> <variable length>
一个数据包读取完成后,通过 checksum.verifyChunkedSums 方法,对 Data 和 Checksum 逐 chunk 校验,chunk 默认大小为512B。
verifyChunked:391, DataChecksum (org.apache.hadoop.util)
verifyChunkedSums:383, DataChecksum (org.apache.hadoop.util)
readNextPacket:218, BlockReaderRemote (org.apache.hadoop.hdfs.client.impl)
read:148, BlockReaderRemote (org.apache.hadoop.hdfs.client.impl)
readFromBlock:120, ByteArrayStrategy (org.apache.hadoop.hdfs)
readBuffer:842, DFSInputStream (org.apache.hadoop.hdfs)
readWithStrategy:909, DFSInputStream (org.apache.hadoop.hdfs)
read:1016, DFSInputStream (org.apache.hadoop.hdfs)
read:100, DataInputStream (java.io)
对于读取数据时发生校验错误,将对应的节点和数据块记为坏块后,通过 reportCheckSumFailure 方法向 Namenode 汇报。
对于读取数据时发生IO错误,则会尝试重试当前 DN 的链路读取,如果还是错误,则选取下一 DN。
在接受完所有的数据内容后,最后会读取一个空尾包,代表读取操作正式结束。这个空尾包通过 readTrailingEmptyPacket 方法读取,里面没有数据内容,只有一个代表结尾的标记,其作用也就是进一步的确认读取完成。如果没有结尾标记,则代表读取预期与数据传输不一致,将抛出IO异常。
close 实际上就是把之前 open 返回的 FSDataInputStream 对象关闭掉,此处不多说了。
在整个读流程中,NN 端负责提供指定文件的所有块信息。 在 getBlockLocations 方法中,先找到指定文件的inode,然后调用 inode.getBlocks 获取。
inode.getBlocks
createLocatedBlocks:2095, BlockManager (org.apache.hadoop.hdfs.server.blockmanagement)
getBlockLocations:178, FSDirStatAndListingOp (org.apache.hadoop.hdfs.server.namenode)
getBlockLocations:2123, FSNamesystem (org.apache.hadoop.hdfs.server.namenode)
getBlockLocations:766, NameNodeRpcServer (org.apache.hadoop.hdfs.server.namenode)
拿到所有的数据块信息后,对于多副本的情况,还要做一个排序,按照网络拓扑的距离,离客户端近的排前面,远的排后面。
sortByDistance:979, NetworkTopology (org.apache.hadoop.net)
sortByDistance:936, NetworkTopology (org.apache.hadoop.net)
sortLocatedBlock:547, DatanodeManager (org.apache.hadoop.hdfs.server.blockmanagement)
sortLocatedBlocks:466, DatanodeManager (org.apache.hadoop.hdfs.server.blockmanagement)
sortLocatedBlocks:2222, FSNamesystem (org.apache.hadoop.hdfs.server.namenode)
getBlockLocations:2205, FSNamesystem (org.apache.hadoop.hdfs.server.namenode)
getBlockLocations:766, NameNodeRpcServer (org.apache.hadoop.hdfs.server.namenode)
作为响应客户端的读取请求的内应,DataXceiver 负责与客户端建立网络传输链路。DataXceiver 对象构造后,会作为一个后台线程执行,负责接收客户端读请求,构造对应的数据块读取处理对象BlockSender。
<init>:149, DataXceiver (org.apache.hadoop.hdfs.server.datanode)
create:139, DataXceiver (org.apache.hadoop.hdfs.server.datanode)
run:220, DataXceiverServer (org.apache.hadoop.hdfs.server.datanode)
run:748, Thread (java.lang)
在读取本地文件流程中,涉及如下几个主要数据结构和子流程:
BlockSender 负责提供数据块的读取、校验和发送全套服务。DataXceiver 线程启动后即开始接收来自客户端的读请求,读请求中包含了BlockSender 的主要构造参数,如block、offset 和length 等。
在BlockSender 的构造过程中完成了一系列的流程,主要包含输入输出流的构造、数据块文件和checksum 文件的检查等。
正式的发送数据包在 doSendBlock 方法中完成,如下:
while (endOffset > offset && !Thread.currentThread().isInterrupted)) {
manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
offset += len;
totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
上面在客户端读流程中也提到了,一个 packet 分为header、checksum 和data 三部分。在DN 端分为两批发送,第一批是header 和checksum,第二批是data。
// 第一批次发送header 和所有checksum
sockOut.write(buf, headerOff, dataOff - headerOff);
// 第二批次发送所有data
fileIoProvider.transferToSocketFully(
ris.getVolumeRef().getVolume(), sockOut, fileCh,
blockInPosition, dataLen, waitTime, transferTime);
header 记录packet 长度和data 长度等简单信息,这些长度都是通过计算得来,比较简单。
checksum 是通过文件的方式与数据块文件结对记录到本地磁盘的,因此需要按需读取。
readChecksumFully:90, ReplicaInputStreams (org.apache.hadoop.hdfs.server.datanode.fsdataset)
readChecksum:656, BlockSender (org.apache.hadoop.hdfs.server.datanode)
sendPacket:565, BlockSender (org.apache.hadoop.hdfs.server.datanode)
doSendBlock:781, BlockSender (org.apache.hadoop.hdfs.server.datanode)
data 部分采用零拷贝的方式发送,不需要像checksum 那样显示的读取,零拷贝的封装在FileChannelImpl 中实现。
transferToDirectlyInternal:428, FileChannelImpl (sun.nio.ch)
transferToDirectly:493, FileChannelImpl (sun.nio.ch)
transferTo:605, FileChannelImpl (sun.nio.ch)
transferToFully:223, SocketOutputStream (org.apache.hadoop.net)
transferToSocketFully:278, FileIoProvider (org.apache.hadoop.hdfs.server.datanode)
sendPacket:596, BlockSender (org.apache.hadoop.hdfs.server.datanode)
doSendBlock:781, BlockSender (org.apache.hadoop.hdfs.server.datanode)
在当前数据块的所有数据包读取并发送完成后,还要额外发送一个空尾包,确认发送结束。
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
out.flush();
至此,HDFS 的读流程梗概梳理完成。
今天研究一下Namenode 是如何将众多Datanode 管理起来的。
在Namenode 中,DatanodeManager 类用来管理所有DN。构造路径:
FSNamesystem
--> BlockManager
--> DatanodeManager
DN 对应三个数据结构,继承关系如下:
DatanodeDescriptor -> DatanodeInfo -> DatanodeID
datanodeMap 以StorageID -> DatanodeDescriptor 的方式,存储了所有的DN。
所有DN 的查找、遍历、排序,都可以由此结构提供。
private final Map<String, DatanodeDescriptor> datanodeMap
= new HashMap<>();
networktopology 以拓扑树的形式维护了所有DN。
在树中,最高级为根节点“/”,叶子节点为一个DN(DatanodeDescriptor),中间节点为InnerNode(数据中心、机房、机架等)。
private final NetworkTopology networktopology;
host2DatanodeMap,以host -> DatanodeDescriptor 的方式,存储所有DN。
一般来说与datanodeMap 是一致的,但同一个DN 的StorageID 是可能变的,因此也维护了这个map。
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
--> private final HashMap<String, DatanodeDescriptor[]> map
= new HashMap<String, DatanodeDescriptor[]>();
DatanodeManager 初始是没有记录任何DN 的。每个DN 启动的时候根据配置文件向目标NN 发送注册请求,然后NN 接收到DN 的注册请求后才记录。
DN 的BPServiceActor 对象维护一个DN 与NN 通信的主体线程,DN 侧的执行流程如下:
connectToNNAndHandshake();
--> bpNamenode = dn.connectToNN(nnAddr); // 先建立链接,拿到nnproxy
--> NamespaceInfo nsInfo = retrieveNamespaceInfo(); // 然后握手,返回对应的NS 信息
--> register(nsInfo); // 最后注册
注册流程调用:
void register(NamespaceInfo nsInfo) throws IOException
--> DatanodeRegistration newBpRegistration = bpos.createRegistration();
--> newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
NN 侧注册调用:
NameNodeRpcServer::registerDatanode(DatanodeRegistration nodeReg)
--> namesystem.registerDatanode(nodeReg);
--> blockManager.registerDatanode(nodeReg);
--> datanodeManager.registerDatanode(nodeReg);
在registerDatanode 方法中,主要构造DatanodeDescriptor 对象,并更新维护上述提到的三个记录DN 的数据结构。
对于networktopology 来说,其核心是一个以clusterMap 为根的拓扑树,其中间节点InnerNode 构造自自机架等信息,流程如下:
clusterMap 的初始化:
InnerNode clusterMap;
this.factory = InnerNodeImpl.FACTORY;
this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
clusterMap 的实际类型为DFSTopologyNodeImpl,其继承关系如下:
public class DFSTopologyNodeImpl extends InnerNodeImpl {}
public class InnerNodeImpl extends NodeBase implements InnerNode {}
对于获取机架,HADOOP 采用了一个叫机架感知的技术,即我们通过参数指定一个脚本,该脚本的输入参数是一个DN 的IP 地址,输出是该DN 的机架字符串,然后在注册流程中,NN 拿到DN IP 后通过该脚本获取对应机架。
获取机架的代码如下:
DatanodeManager::resolveNetworkLocation(DatanodeID node)
--> dnsToSwitchMapping.resolve(names);
--> ScriptBasedMapping::resolve(List<String> names)
* 脚本配置参数:net.topology.script.file.name
新机架的DN 加入时,会通过机架名称构造InnerNode。
获取机架,创建机架节点并挂到父节点(clusterMap),最后把DN 加到机架节点中:
networktopology.add(node)
--> clusterMap.add(node)
--> String parentName = getNextAncestorName(n);
--> parentNode = createParentNode(parentName);
--> children.add(parentNode);
--> childrenMap.put(parentNode.getName(), parentNode);
--> parentNode.add(n)
除上述外,还有节点删除、刷新、心跳和上下线等操作,逻辑比较简单。
总之,所有的DN 在NN 端都是在这三个MAP 中管理起来的,论用途的话,不同的场合选用不同的结构。
比如datanodeMap 和host2DatanodeMap 都可以看做简单的HashMap,key 不同而已,而networktopology 这个拓扑树用作节点索引,通常用作为数据块选点选盘。
在副本冗余模式的分布式存储系统中,副本分发属于重量级的操作,其中pipeline 思路的应用一定程度上解决了效率问题。
例如面对一个三副本的冗余配置,我们很轻易的可以想到一个简单的分发方案:
先写第一副本,写完后拷贝给第二三副本,收工。
乍一看,我们完成了副本分发的需求,但思考之后存在几个问题:
因此,数据流pipeline 思想在此被完美运用。
客户端要写入的数据被拆分成若干小包(packet),在发起写请求之前从NN 获取到三个副本的承载DataNode,然后建立pipeline 链路。
如下面流程示意,客户端数据被拆成ABCD 四个包,按照pipeline 链路依次发送,当DN 接收到一个包时,先固化到本地,然后转手给下游,直到最后一个DN 固化完成。
最后一个DN 固化完成后,会原路返回结果,最终客户端收到返回结果,代表本次写请求正式完成。
step1: Client(ABCD) -> DN1() -> DN2() -> DN3()
step2: Client(BCD) -> DN1(A) -> DN2() -> DN3()
step3: Client(CD) -> DN1(AB) -> DN2(A) -> DN3()
step4: Client(D) -> DN1(ABC) -> DN2(AB) -> DN3(A)
step5: Client() -> DN1(ABCD) -> DN2(ABC) -> DN3(AB)
step6: Client() -> DN1(ABCD) -> DN2(ABCD) -> DN3(ABC)
step7: Client() -> DN1(ABCD) -> DN2(ABCD) -> DN3(ABCD)
以客户端需要新写文件为例,客户端先从NameNode 获取到一组DataNode,然后创建pipeline。
pipeline 的源头在客户端DFSOutputStream->DataStreamer 类,如下:
DataStreamer::run() {
setPipeline(nextBlockOutputStream());
}
nextBlockOutputStream()
--> createBlockOutputStream
// nodes[0] 即为第一个DataNode
--> createSocketForPipeline(nodes[0], nodes.length, dfsClient);
pipeline 在DataNode 端有上游和下游两个角色,均在DataXceiver 类管理。
DataXceiver 在DataXceiverServer 中构造,Server accept 端对应TcpPeerServer 类,Server 处理请求和Client 端对应DataXceiver 类。
DataNode 进程启动的时候,会构造并启动一个DataXceiverServer Daemon 线程,该线程通过TcpPeerServer 对象accept pipeline 上游连接请求,然后构造并启动对应的DataXceiver,如下:
DataXceiverServer::run() {
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
...
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
}
}
}
可以看出,一个DataXceiver 对象代表一个pipeline 链路节点,在DataXceiver 中完成了具体的数据读写处理,如下:
DataXceiver::run() {
do {
op = readOp();
processOp(op);
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
}
// 连接下游的动作发生在WRITE_BLOCK 这个op 中,最后一个DataNode 没有下游
processOp(op)
--> opWriteBlock(...)
--> DataXceiver.writeBlock(...)
--> mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
--> mirrorSock = datanode.newSocket();
--> NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
上下游数据接转由BlockReceiver 类负责,其对象blockReceiver 在DataXceiver 中构造,数据包接转代码如下:
DataXceiver.writeBlock(...)
--> blockReceiver.receiveBlock(...)
--> while (receivePacket() >= 0)
receivePacket()
--> packetReceiver.receiveNextPacket(in) // 读数据包
--> packetReceiver.mirrorPacketTo(mirrorOut); // 发给下游
--> streams.writeDataToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk); // 本地固化
至此,pipeline 链路建立完成。
PipelineHealthChecker (待续。。。)
JMX 是HADOOP 集成的监控模块,提供了通过WEB 访问获取几乎所有指标统计的方式。
JMX 通过JMXJsonServlet 类实现,该类继承了javax.servlet.http.HttpServlet。
访问JMX 指标统计的方式:
WEB 访问 http://x.x.x.x:50070/jmx (默认端口50070)
命令行访问 curl -i http://x.x.x.x:50070/jmx
返回结构为json 格式。
通常来说,由于指标太多,返回较慢,我们可以通过 qry
参数指定查询项。例如:
http://x.x.x.x:50070/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem
还嫌多,get
参数可以查询指定项目中的具体指标。例如:
http://x.x.x.x:50070/jmx?get=Hadoop:service=NameNode,name=FSNamesystem::BlocksTotal
在 JMXJsonServlet 类中,doGet 方法会将指定的指标统计项转换成json 串,各个业务模块的指标项数据的获取途径是 MBeanServer。
JMXJsonServlet 启动的时候会构造一个 MBeanServer,然后各个业务模块启动的时候会将统计对象注册到MBeanServer。如 FSNamesystem 类:
// 直接继承相关的统计类,几个统计类中包含了所有的统计指标获取接口
public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NameNodeMXBean, ReplicatedBlocksMBean, ECBlockGroupsMBean {...}
// 将几个统计项注册到 MBeanServer
private void registerMBean() {
// We can only implement one MXBean interface, so we keep the old one.
try {
StandardMBean namesystemBean = new StandardMBean(
this, FSNamesystemMBean.class);
StandardMBean replicaBean = new StandardMBean(
this, ReplicatedBlocksMBean.class);
StandardMBean ecBean = new StandardMBean(
this, ECBlockGroupsMBean.class);
namesystemMBeanName = MBeans.register(
"NameNode", "FSNamesystemState", namesystemBean);
replicatedBlocksMBeanName = MBeans.register(
"NameNode", "ReplicatedBlocksState", replicaBean);
ecBlockGroupsMBeanName = MBeans.register(
"NameNode", "ECBlockGroupsState", ecBean);
} catch (NotCompliantMBeanException e) {
throw new RuntimeException("Bad MBean setup", e);
}
LOG.info("Registered FSNamesystemState, ReplicatedBlocksState and " +
"ECBlockGroupsState MBeans.");
}
一种提高响应请求效率的优化方案:
可以考虑启动一个后台线程,周期性的将统计项采集并缓存,然后查询请求命中缓存则避免实时遍历采集所有指标项导致的时延开销。
此优化方案适用于对实时性不太敏感的监控场景。
TOS (type-of-service) 是IP 报文协议头的一个8位字段,用于配置此类流量的优先级。
Java API 中,java.net.Socket.setTrafficClass(int tc) 接口用于配置TOS。
由于TOS 与DSCP 的兼容性,此API 也是用于DSCP 配置。
DSCP(Differentiated Services Codepoint)是TOS 字段的高6 位,低2 位补0;
保证转发(Assured Forwarding,AF)由RFC2597 对CS1~CS4 进行进一步定义。它使用第3和第4比特做丢弃优先级标志。
这样,在同一类数据中,又根据被丢弃的可能性划分出3档。下表列出了AF服务等级及其对应的DSCP值:
CS1 | CS2 | CS3 | CS4 | ||
---|---|---|---|---|---|
Lowdrop | AF11 | AF21 | AF31 | AF41 | |
001010 | 010010 | 011010 | 100010 | ||
Mediumdrop | AF12 | AF22 | AF32 | AF42 | |
001100 | 010100 | 011100 | 100100 | ||
Highdrop | AF13 | AF23 | AF33 | AF43 | |
001110 | 010110 | 011110 | 100110 |
本实验我们配置DSCP 为AF41(0b100010 = 0x22 = 34,对应TOS:0b10001000 = 0x88 = 136)
如下两个java 文件分别作为客户端和服务端代码,编译执行:
# javac Client.java
# javac Server.java
# java -Djava.net.preferIPv4Stack=true Server 7777 136
Starting server....
Client:172.28.200.100 is accepted.
Test start.
# java -Djava.net.preferIPv4Stack=true Client 10.198.245.86 7777 136
Test start.
1000 pingpongs, Average Time(us): 2087
执行期间通过tcpdump 抓包:
# tcpdump port 7777 -nnvvS
......
16:27:01.224518 IP (tos 0x88, ttl 64, id 4821, offset 0, flags [DF], proto TCP (6), length 42)
172.28.200.100.60918 > 10.198.245.86.7777: Flags [P.], cksum 0x74ba (incorrect -> 0x1449), seq 1936351735:1936351737, ack 1871775163, win 58, length 2
16:27:01.226615 IP (tos 0x88, ttl 57, id 62839, offset 0, flags [DF], proto TCP (6), length 42)
10.198.245.86.7777 > 172.28.200.100.60918: Flags [P.], cksum 0x1447 (correct), seq 1871775163:1871775165, ack 1936351737, win 58, length 2
......
<-- 可以看到数据部分tos 为0x88(默认为0),说明设置生效。
ping 时延测试如下图:
//File: Client.java
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class Client {
public static void main(String[] args) {
String ip = args[0];
int port = Integer.parseInt(args[1]);
int tos = Integer.parseInt(args[2]);
try {
Socket s = new Socket(ip, port);
s.setTrafficClass(tos);
InputStream is = s.getInputStream();
OutputStream os = s.getOutputStream();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));
BufferedReader br = new BufferedReader(new InputStreamReader(is));
bw.write("Test start.\n");
bw.flush();
String hiMsg = br.readLine();
System.out.println(hiMsg);
long now_us = 0;
long sum_us = 0;
long begin_us = 0;
int num = 1000;
for (int i = 0; i < num; i++) {
bw.write(i);
bw.flush();
begin_us = System.nanoTime() / 1000;
int msg = br.read();
now_us = System.nanoTime() / 1000;
sum_us += now_us - begin_us;
//System.out.println("Server: " + msg + ", us:" + sum_us);
}
System.out.println(num + " pingpongs, Average Time(us): " + (sum_us / num));
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// File: Server.java
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
public static void main(String[] args) {
int port = Integer.parseInt(args[0]);
int tos = Integer.parseInt(args[1]);
while (true) {
ServerSocket ss = null;
try {
ss = new ServerSocket(port);
System.out.println("Starting server....");
Socket s = ss.accept();
s.setTrafficClass(tos);
System.out.println("Client:" + s.getInetAddress().getHostAddress() + " is accepted.");
BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
String hiMsg = br.readLine();
System.out.println(hiMsg);
bw.write(hiMsg + "\n");
bw.flush();
int num = 100000;
for (int i = 0; i < num; i++) {
int msg = br.read();
//System.out.println("Client:" + msg);
bw.write(msg);
bw.flush();
}
} catch (Throwable e) {
//e.printStackTrace();
System.out.println(e);
try {
ss.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
}
}
此篇以前一篇网络通信之RPC 为背景。
我们已经了解了Reader 线程构造完指令后会推到指令队列,然后Handler 线程会从指令队列里拿指令任务去执行。不难想象,这个指令队列可能会是一个性能瓶颈。如果指令队列仅仅是单个queue 的话,那么将引发严重的竞争(单个client 提交了一大批调用指令,其他client 可能饿死)。
解决问题也不难,一个不行就上多个,多个还不够好就搞优先级。对此,HADOOP RPC 祭出了FairCallQueue。如下图:
在RPC Server 中,构造了一个CallQueueManager,其中会创建一个调度器和一个阻塞队列,调度器和阻塞队列可以有不同的实现形式,分别通过scheduler.impl
和callqueue.impl
参数指定,如下:
// 默认 DecayRpcScheduler
this.scheduler = createScheduler(schedulerClass, priorityLevels,
namespace, conf);
// 默认 FairCallQueue
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
priorityLevels, maxQueueSize, namespace, capacityWeights, conf);
FairCallQueue 对象中包含了若干个阻塞队列,队列数量与优先级数量相当。
当Handler 要从队列取出call 时,会通过FairCallQueue 中维护的一个权重管理器来确定具体从那个队列取出。
FairCallQueue 主要初始化如下:
this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
相关参数:
faircallqueue.priority-levels: 队列优先级数量,默认与调度优先级数量相同
scheduler.priority.levels: 调度优先级数量,默认4(0,1,2,3), 0 为最高优先级
faircallqueue.multiplexer.weights: 默认为8,4,2,1,即最高优先级队列取8个请求,第二优先级4个,以此类推。
当Reader 收到一个call 请求后,会通过DecayRpcScheduler 获取优先级,然后加入到对应的阻塞队列。
优先级的计算主要来自用户call 的计数。
DecayRpcScheduler 维护着每个用户的请求计数。这个计数随时间逐渐减少。一个定时任务 DecayTask 在每个周期将每个用户的请求计数乘以衰减系数。这样就维护了每个用户请求计数的加权/滚动平均值,以使用户请求优先级控制更加平滑。
同时,DecayTask 在每次执行时,会重新计算所有已知用户的优先级,call 请求占总数的50% 以上的用户置于最低优先级,占25%~50% 在第二低优先级,占12.5%~25% 在第二高的优先级,所有其他用户被排在最高优先级。
在扫描结束时,每个已知用户都有一个缓存的优先级,该优先级将一直使用到下一次扫描。两次扫描之间出现的新用户将即时计算其优先级。
此外,还有一个回退机制,即队列已满或者请求指令执行时间过长的情况,已经收到的call 请求将被以向client 抛异常的方式回退,client 通常等待一段时间再重试。
shouldBackOff 方法用于判断是否应该回退。
DecayTask 执行如下:
decayCurrentCosts()
-> long nextValue = (long) (currentValue * decayFactor);
-> recomputeScheduleCache();
-> computePriorityLevel(snapshot, id);
-> updateAverageResponseTime(true);
相关参数:
decay-scheduler.period-ms: DecayTask 执行周期,默认 5s
decay-scheduler.decay-factor: 衰减系数,默认 0.5
decay-scheduler.thresholds: 优先级阈值,默认 0.125,0.25,0.5
decay-scheduler.backoff.responsetime.thresholds: 回退时间阈值,默认0优先级10s, 1-20s, 2-30s, 3-40s
队列拥塞控制是无解的,所有的方案都是权衡的结果,FairCallQueue 作为当前HADOOP RPC 的拥塞控制模块,已经很大程度上解决了性能瓶颈。
作为一个分布式存储系统,节点之间的通信是必不可少的,通信方式至少包含两类:
我们先说一下指令通信,也叫管理网络,也叫IPC(Inter-process communication),也叫RPC(Remote Procedure Call)。
试想一下,节点A 需要获取节点B 的状态S 的值,我们大致的实现如下:
1. NodeA -> command(getValue, S) -> NodeB
2. NodeA <- result(S, 0xabc) <- NodeB
第一步,NodeA 发送指令给NodeB,指令包含操作类型“getValue” 和操作对象“S”。
第二步,NodeB 执行操作完成后,发送结果给NodeA,结果包含操作对象“S”和值“0xabc”。
可见,要完成一个RPC,至少需要几个实现要点:
由于一个分布式系统可能需要几十上百种RPC 指令,因此需要一个模块,将上述实现要点封装,然后统一注册配置,以期望保证系统设计与实现的简洁便利性。
Hadoop RPC 就是干这个用的,指令协议与序列化在其中被封装,名为stub,如下盗图:
Hadop RPC 的实现,主要涉及java 的动态代理,java NIO 和protobuf 等基础技术。
下面我们分析下Namenode 与Datanode 的RPC 相关实现。
NameNodeRpcServer 类包含了所有的RPC Server 对象,在进程启动时构造,如下:
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);
serviceRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
然后,在ProtobufRpcEngine2 做Server 的初始化工作,如下:
public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig, AlignmentContext alignmentContext)
throws IOException {
return new Server(protocol, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig, alignmentContext);
}
相关参数:
dfs.namenode.service.handler.count: RPC call 的处理线程数,默认10
ipc.server.handler.queue.size: 每个handler 对应的最大队列长度
ipc.server.listen.queue.size: 监听队列长度,默认128。需要注意当集群规模上千时,需要设置一个较大值,如32768,同时kernel 参数net.core.somaxconn 需要同步设置。
RPC Server 包含三个重要组件,callQueue、connectionManager 和responder,如下:
// 用于管理指令队列
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
getSchedulerClass(prefix, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
// 网络监听及会话管理
listener = new Listener(port);
connectionManager = new ConnectionManager();
// 用于返回调用结果
responder = new Responder();
在Listener 的构造中,启动一个listener 线程用于监听连接,同时启动若干个Reader 工作线程。
监听通过Selector 实现,即acceptChannel 注册到selector,然后在listener 线程中做select 阻塞,用于处理会话accept(不就是poll 嘛),如下:
acceptChannel = ServerSocketChannel.open();
selector= Selector.open();
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
// listener 线程中,select 阻塞等待会话请求,然后遍历channel 做accept 处理
run() {
getSelector().select();
doAccept(key);
}
doAccept() {
channel = server.accept();
// RoundRobin 拿Reader 工作线程
Reader reader = getReader();
// 会话注册到connectionManager
Connection c = connectionManager.register(channel,
this.listenPort, this.isOnAuxiliaryPort);
// 推给Reader 转化RPC 请求
reader.addConnection(c);
}
Reader 读取网络包,构造RPC 请求,然后放到callQueue,然后由handler 线程执行,流程如下:
doRead()
-> readAndProcess()
-> channelRead(channel, data);
-> processOneRpc(requestData);
-> processRpcRequest(header, buffer);
-> rpcRequest = buffer.newInstance(rpcRequestClass, conf);
RpcCall call = new RpcCall(this, header.getCallId(),
header.getRetryCount(), rpcRequest,
ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), span, callerContext);
internalQueueCall(call);
-> internalQueueCall(call, true);
-> callQueue.put(call);
-> callQueue.add(call);
所有的会话会都注册到ConnectionManager 管理,在listener 线程中会有个空闲会话的处理,启动一个Timer 线程,遍历所有会话,闲置一定时间的会话将被关闭。如下:
connectionManager.startIdleScan()
-> scheduleIdleScanTask()
-> closeIdle()
相关参数:
ipc.client.idlethreshold: 会话数超过此阈值,才执行后续空闲会话处理
ipc.client.connection.maxidletime: 空闲时间超过此阈值两倍的会话将被关闭
在Responder 的构造中,启动一个线程用于处理所有call 的返回结果。
Responder 通过doRespond 方法接收需要返回的call,每个会话对象会维护一个responseQueue,call 会被加入其中,然后做发送处理。
总体而言,发送处理也是通过Selector 选择channel,然后将对应的call result 发送回client,如下:
writeSelector = Selector.open();
// 注册channel 和call
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
// 阻塞一个清理时间
writeSelector.select(TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
// 通过channel 异步返回结果
doAsyncWrite(key)
-> processResponse(call.connection.responseQueue, false)
-> channelWrite(channel, call.rpcResponse)
相关参数:
ipc.server.max.response.size: 响应请求的消息最大长度;超量消息会被记录到log
Datanode 的client 角色是在进程启动时创建的,也叫BPOfferService,如下:
// 每个NS 对应一个BPOfferService
BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
lifelineAddrs);
在BPOfferService 中,为每个Namenode 创建了一个BPServiceActor,如下:
for (int i = 0; i < nnAddrs.size(); ++i) {
this.bpServices.add(new BPServiceActor(nameserviceId, nnIds.get(i),
nnAddrs.get(i), lifelineNnAddrs.get(i), this));
}
每个BPServiceActor 都是一个独立的线程,如下:
//This must be called only by BPOfferService
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already
return;
}
bpThread = new Thread(this);
bpThread.setDaemon(true); // needed for JUnit testing
if (lifelineSender != null) {
lifelineSender.start();
}
bpThread.start();
}
BPServiceActor 线程在启动时会构造RPC client 相关组件,如下:
run()
-> connectToNNAndHandshake()
-> connectToNN(nnAddr)
-> new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf())
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine2.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}
我们在ProtobufRpcEngine2 类中,可以找到实际的网络client 初始化,如下:
getProxy
-> new Invoker()
-> Client.ConnectionId.getConnectionId()
-> new ConnectionId() // 只是赋值server 地址之类的参数
相关参数:
ipc.client.connect.max.retries: 客户端最大重试连接次数
而真正的连接发生在RPC 调用时,首先构造一个connection 对象,如下:
Client.call()
-> getConnection()
// connections 用于保存connection 对象,以不必每次call 都重新构造
connection = connections.computeIfAbsent(remoteId,
id -> new Connection(id, serviceClass, removeMethod));
然后创建与server 的会话:
connection.setupIOstreams(fallbackToSimpleAuth);
-> setupConnection(ticket); // Socket 配置及连接
-> start(); // 启动接收返回结果的线程
-> run()
-> receiveRpcResponse()
综上,我们可以看到整个RPC 实现被封装在ProtobufRpcEngine2 中,其中Server 和Client 端分别实现了网络会话管理,以及通过protobuf 实现序列化与反序列化。
HDFS 提供类似回收站的功能,即被执行删除的文件会被移动到指定的Trash 目录,之后在一个合适的时机才被真正删除。
参考删除命令:
Usage: hadoop fs [generic options] -rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...
可以看出,只有指定了”-skipTrash“ 参数,文件才会直接删除,否则进Trash。
每个用户都有自己的Trash 目录:
/user/$USERNAME/.Trash # 当前被删除的文件会被放在.Trash 的子目录Current 下
命令行在处理”-rm“ 指令时,先尝试执行moveToTrash,后执行delete。
if (moveToTrash(item) || !canBeSafelyDeleted(item)) {
return;
}
if (!item.fs.delete(item.path, deleteDirs)) {
throw new PathIOException(item.toString());
}
对于moveToTrash 操作,真正的执行方法是TrashPolicyDefault::moveToTrash,最终会调用rename 完成。
// move to current trash
fs.rename(path, trashPath,
Rename.TO_TRASH);
LOG.info("Moved: '" + path + "' to trash at: " + trashPath);
return true;
Active NN 会通过startTrashEmptier 方法启动一个后台线程作为清理器。
this.emptier = new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier");
清理器涉及两个重要参数:
fs.trash.interval
:用于配置回收站中数据的保存周期,0代表不开启回收站功能;fs.trash.checkpoint.interval
:清理器检查处理周期;清理器的执行实例主体为:TrashPolicyDefault::Emptier,在run() 函数中完成清理操作,其中涉及一个Checkpoint 概念,其实很简单,被删除的文件被移动到Trash 目录下的Current 子目录,创建检查点会吧Current 子目录重命名为当前时间戳以供删除时判断是否过期:
Collection<FileStatus> trashRoots;
trashRoots = fs.getTrashRoots(true); // list all trash dirs
for (FileStatus trashRoot : trashRoots) { // dump each trash
if (!trashRoot.isDirectory())
continue;
try {
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
// 删除过期的文件
trash.deleteCheckpoint(trashRoot.getPath(), false);
// 将当前Trash 子目录Current 以时间戳重命名作为下一次的检查点
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
} catch (IOException e) {
LOG.warn("Trash caught: "+e+". Skipping " +
trashRoot.getPath() + ".");
}
}
回收站也可以手动触发清理:
hdfs dfs -expunge
hadoop fs -expunge
Namenode 主备切换的基础是二者状态机的一致性,例如元数据信息等。为此,HDFS 提供了两种实现方式。
两种方式的共同点比较明显,都是共享存储,即对所有日志事务做中心管理。
此时我们需要了解一些namenode 元数据磁盘结构管理机制。
为了同步元数据信息,Active NN 在服务客户端请求(如新建文件)时会不断的生产日志事务,对应一个txid,即EditLog,若干条EditLog 构成一个edits 文件,可以说edits 文件是元数据的同步单元。
edits 文件也就是所谓的EditLog Segment,分为写入和完成两种状态,分别对应两种命名格式的文件:
写入到完成也叫log-roll,有两个触发情况:
edits 文件数会随着业务不断积累增长,为了解决此问题,需要找一个合适的机会对edits 文件做合并,合并后的文件即为FSImage。
这个合并操作即为Checkpoint,也叫FSImage 回滚。
由于Checkpoint 操作的计算和内存开销都比较大,且对客户端请求有直接的影响,因此由 Standby NN 完成。
Checkpoint 操作由StandbyCheckpointer::CheckpointerThread 线程调用doCheckpoint 方法执行。
触发条件有两个,满足其一即可:
在Standby NN 的 doCheckpoint 方法中,新的FSImage 文件合并完成后,要upload 到Active NN:
edits 文件保存的日志事务数默认1000000 个,参数dfs.namenode.num.extra.edits.retained。
FSImage 文件默认保存两个,对应参数dfs.namenode.num.checkpoints.retained。
# 查看edits 文件:
hdfs oev -i edits -o edits.xml
# 查看FSImage 文件:
hdfs oiv -p XML -i fsimage -o fsimage.xml
基于上述元数据管理机制,我们需要一个可靠的中介模块来传输edits 文件,也就是上面提到的QJM 和NFS,而官方更倾向于QJM 方式。
JournalNode 是一个集群,由3 个以上的奇数个节点组成(paxos 协议?),JN 仅用于中介edits 文件传输,属于比较轻量级的服务。
Active NN 会将edits 文件同步给JN,然后Standby NN 会周期的从JN 同步获取最新edits(周期参数dfs.ha.tail-edits.period),然后在本地执行 Checkpoint 及后续操作。
Namenode 对 JN 的读写操作都通过 FSEditLog 对象完成。
读分为两种情况:
写有一种情况:
写流程涉及到一个双缓冲区的使用,一个负责更新,一个负责固化落盘,两者交换使用,整理如下:
logEdit
--> doEditTransaction
--> EditsDoubleBuffer::writeOp // 更新buffer
logSync
--> editLogStream.setReadyToFlush() // 将更新buffer 与落盘buffer 交换
--> EditLogFileOutputStream::flushAndSync // 写入本地文件
--> QuorumOutputStream::flushAndSync // 写入 JN
FSEditLog 对象构造:
在 Namenode 启动流程中,FSImage 模块启动时构造,其中最主要一点是构造 journalSet。
journalSet 是 JournalSet 类的对象,维护了本地文件和 JN 集群。
配置本地文件目录和 JN 集群的参数:dfs.namenode.shared.edits.dir