Hadoop相关源代码分析文档

六世轮回

贡献于2011-08-02

字数:61828 关键词: Hadoop 分布式/云计算/大数据 方案 报告 diff

SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 1 Hadoo p 相关源代码分析 目录 1. HDFS 文件操作 ........................................................................................................ 3 1.1 HDFS 写文件操作 .......................................................................................... 3 1.1.1 Hdfs 写文件总体流程 .......................................................................... 3 1.1.2 datanode 端写文件流程 ....................................................................... 4 1.1.3 namenode 端写文件流程 ..................................................................... 4 1.1.4 client 端写文件流程 ............................................................................. 5 1.2 HDFS 恢复操作 .............................................................................................. 5 1.2.1 恢复文件总体流程图.......................................................................... 5 2 HDFS 源代码解析 ..................................................................................................... 7 2.1 Datanode 解析 ................................................................................................. 7 2.1.1 DataNode 接口和协议 ......................................................................... 8 2.1.1.1 InterDataNodeProtocol .............................................................. 8 2.1.1.2 ClientDataNodeProtocol ............................................................ 9 2.1.2 DataNode 重要数据结构 ................................................................... 10 2.1.2.1 DataXceiverServer 类 .............................................................. 10 2.1.2.2 DataXceiver 类 ........................................................................ 11 2.1.2.3 BlockSender 类和 BlockReceiver 类 ...................................... 16 2.1.2.4 PacketResponder 类 ................................................................. 18 2.1.2.5 DataNode 类 ............................................................................ 20 2.1.3 Datanode 监视线程功能描述 ............................................................ 24 2.1.4 Datanode 上主要工作流程 ................................................................ 28 2.1.4.1 DataNode 写数据具体流程 .................................................... 28 2.1.4.2 DataNode 上恢复数据处理流程 ............................................ 30 2.2 NameNode 解析 ............................................................................................ 31 2.2.1 NameNode 接口和协议 ..................................................................... 32 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 2 2.2.1.1 ClientProtocol .......................................................................... 33 2.2.1.2 NameNodeProtocol .................................................................. 37 2.2.1.3 DataNodeProtocol .................................................................... 38 2.2.2 NameNode 重要数据结构 ................................................................. 42 2.2.2.1 Inode* ...................................................................................... 42 2.2.2.2 Block* ...................................................................................... 45 2.2.2.3 Datanode* ................................................................................ 49 2.2.2.4 FSImage ................................................................................... 56 2.2.2.5 FSEditLog ................................................................................ 59 2.2.2.6 LocatedBlock 类 ...................................................................... 61 2.2.3 Namenode 数据队列分析 .................................................................. 62 2.2.3.1 数据队列功能及维护方法..................................................... 62 2.2.3.2 监视线程功能描述 ................................................................ 65 2.2.4 NameNode 上的主要工作流程代码分析 ......................................... 76 2.2.4.1 两个重要监视线程................................................................. 76 2.2.4.2 NameNode 写数据代码流程 .................................................. 77 2.2.4.3 NameNode 恢复数据代码流程 .............................................. 80 2.3 Client 写文件解析 ........................................................................................ 82 2.3.1 Client 主要数据结构 .......................................................................... 82 2.3.1.1 OutputStream 类 ...................................................................... 82 2.3.1.2 FSOutputSummer 类 ............................................................... 82 2.3.1.3 Packet 类 .................................................................................. 83 2.3.1.4 DataStreamer 类....................................................................... 83 2.3.1.5 ResponseProcessor 类 .............................................................. 84 2.3.1.6 DFSOutputStream 类 ............................................................... 84 2.3.2 Client 写数据具体流程 ...................................................................... 86 2.3.3 Client 恢复数据具体流程 .................................................................. 88 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 3 1. HDFS 文件操作 1.1 HDFS 写文件操作 1.1.1 Hdfs 写文件总体流程 图 1.1.1 HDFS 写文件总体流程 一个典型的 HDFS 系统包括一个 NameNode 和多个 DataNode。NameNode 维护名字空间;而 DataNode 存储数据块。 DataNode 负责存储数据,一个数据块在多个 DataNode 中有备份;而一个 DataNode 对于一个块最多只包含一个备份。可以简单地认为 DataNode 上存了数 据块 ID 和数据块内容,以及他们的映射关系。 一个 HDFS 集群可能包含上千 DataNode 节点,这些 DataNode 定时和 NameNode 通信,接受 NameNode 的指令。为了减轻 NameNode 的负担,NameNode 上并不永久保存那个 DataNode 上有那些数据块的信息,而是通过 DataNode 启动 时的上报,来更新 NameNode 上的映射表。 DataNode 和 NameNode 建立连接以后,就会不断地和 NameNode 保持心跳。 心跳的返回其还也包含了 NameNode 对 DataNode 的一些命令,如删除数据库或 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 4 者是把数据块复制到另一个 DataNode。应该注意的是:NameNode 不会发起到 DataNode 的请求,在这个通信过程中,它们是严格的客户端/服务器架构。 DataNode 当然也作为服务器接受来自客户端的访问,处理数据块读/写请求。 DataNode 之间还会相互通信,执行数据块复制任务,同时,在客户端做写操作 的时候,DataNode 需要相互配合,保证写操作的一致性。 下面从一个比较宽泛的角度描述在一次写文件过程中 datanode、namenode 和 client 分别处理的流程,以为进一步分析奠定一个整体的概念。 1.1.2 datanode 端写文件流程 思路比较简单,连接好的节点之间建立数据流,成功后会给 client 一个应答。 Client DataNode _1 DataNode _2 DataNode _3 dataIn relayOut mirrorOut mirrorOut mirrorIn mirrorIn 图 1.1.2 DataNode 端写文件流程 1.1.3 namenode 端写文件流程 接收来自Client的写文件 请求 成功选取节点 从hdfs集群的dataNode中 随机选取存储相应block的 节点 具体的节点数由相 应的replication数 决定 否 产生blockId,并把该数据块信息 (blockInfo)写到该索引文件InodeFile 中,然后将选取的节点信息返回给Client 是 初始化相应的索引文件信息InodeFile等, 通知Client可以开始传送数据块 接收来自Client的写数据 块请求 Client文件传送 完毕 否 文件传输完毕 是 图 1.1.3 NameNode 端写文件流程 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 5 1.1.4 client 端写文件流程 开始 向nameNode发送创建 文件请求 nameNode反馈文件成 功创建 否 向nameNode申请存储相 应数据块的节点 是 接收nameNode反 馈的节点信息 写文件失败 成功申请节点 否 向相应的首节点发送 存储数据请求 是 写数据块成功 接收dataNode写数据 反馈 否 这里hdfs的处理 方法是什么? 文件存储完毕 否 文件传输完毕 是 图 1. 1.4 Client 端写文件流程 1.2 HDFS 恢复操作 1.2.1 恢复文件总体流程图 图1.2.1描述了DataNode上扫描到了损坏数据块并向NameNode汇报的流程, 这个流程将导致损坏的数据块被记录,图 1.2.2 则是处理损坏数据块的流程,损 坏数据块的处理主要靠 heartBeat 命令来处理,NameNode 对于 heartBeat 信号除 了完成例行的存活性检查之外,还要检查若干任务队列(每个 DataNode 都有自 己的任务队列),如果任务队列上有相应的任务,NameNode 在处理完 heartBeat 以后通知相应的 DataNode 执行任务,恢复就是 DataNode 例行的任务之一。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 6 图 1.2.1 DataNode 扫描错误数据并上报 NameNode 图 1.2.2 处理损坏数据块的流程 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 7 2 HDFS 源代码解析 2.1 Datanode 解析 我们将要对 datanode 做详细解析之前,先看下 HDFS 系统实现的 datanode 表现出来的一些特性。 安装 Hadoop 的时候,我们会指定对应的数据块存放目录,当我们检查数据 块存放目录时,我们会发现下面有个叫 dfs 的目录,所有的数据就存放在 dfs/data 里面。 图 2.1.1 dataNode 存放目录结构 其中有两个文件,storage 里存的东西是一些出错信息,in_use.lock 是一个空 文件,它的作用是如果需要对整个系统做互斥操作,应用应该获取它上面的一个 锁。 接下来是 3 个目录,current 存的是当前有效的数据块,detach 存的是快照 (snapshot,目前没有实现),tmp 保存的是一些操作需要的临时数据块。但我 们进入 current 目录以后,就会发现有一系列的数据块文件和数据块元数据文件。 同时还有一些子目录,它们的名字是 subdir0 到 subdir63,子目录下也有数据块 文件和数据块元数据。这是因为 HDFS 限定了每个目录存放数据块文件的数量, 多了以后会创建子目录来保存。 数据块文件保存了 HDFS 中的数据,数据块最大可以到 64M。每个数据块文 件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数 据块文件名和它的元数据文件名的例子: blk_3148782637964391313 blk_3148782637964391313_242812.meta 上面的例子中,3148782637964391313 是数据块的 ID 号,242812 是数据块 的版本号,用于一致性检查。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 8 在 current 目录下还有下面几个文件: VERSION,保存了一些文件系统的元信息。 dncp_block_verification.log.curr 和 dncp_block_verification.log.prev,它记录了 一些 DataNode 对文件系定时统做一致性检查需要的信息。 2.1.1 DataNode 接口和协议 //暂时先不放着,主要是 InterDataNodeProtocol 和 ClientDataNodeProtocol 2.1.1.1 InterDataNodeProtocol 现在类图不好画了,这里比较简单,直接贴代码吧:  getBlockMetaDataInfo SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 9 DataNode 实现的这个方法主要用于 recover 时,DataNode 校验源数据, 同时在接口测试用例中,该方法也派上了用场。  updateBlock 主要是更新了 block 的时间戳 generationStamp 以及块的大小,更新的情 况发生在:(1)。 2.1.1.2 ClientDataNodeProtocol ClientDataNodeProtocol 是为数据块恢复(block Recovery)准备的在 Client 与 DataNode 交互的协议。 其中只提供了一个方法,即 recoverBlock,下面就来分析下这个方法。  revoverBlock SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 10 2.1.2 DataNode 重要数据结构 DataNode 处理数据部分,往往是一种流式机制。DataXceiverServer 和 DataXceiver 就是这个机制的实现。其中,DataXceiver 还依赖于两个辅助类: BlockSender 和 BlockReceiver。下面是类图: 图 2.1.2.1 DataNode 写文件相关类图 2.1.2.1 DataXceiverServer 类 图 2.1.2.2 DataXceiverServer 类图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 11  功能 DataXceiverServer 很简单,它打开一个端口,然后每接收到一个连接,就创 建一个 DataXceiver,服务于该连接,并记录该连接的 socket,对应的实现在 DataXceiverServer 的 run 方法里。当系统关闭时,DataXceiverServer 将关闭监听 的 socket 和所有 DataXceiver 的 socket,这样就导致了 DataXceiver 出错并结束线 程。 2.1.2.2 DataXceiver 类 图 2.1.2.3 DataXceiver 类图  功能 对于当前使用的 hadoop0.18.3 版本的代码而言,DataXceiver 类提供了以下五 种功能支持,分别是: OP_WRITE_BLOCK (80): 写数据块 OP_READ_BLOCK (81): 读数据块 OP_READ_METADATA (82):读数据块元文件 OP_REPLACE_BLOCK (83):替换一个数据块 OP_COPY_BLOCK (84): 拷贝一个数据块 其中和本项目关系最大的当然是 writeBlock,readBlock 以及 recoverBlock。  主要数据结构 socket s:完成当前 dataNode 传输连接的套接字; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 12 String remoteAddress, localAddress :分别代表上述套接字连接的两端主机 IP 地址;  主要方法 正如前面的分析,我们着重要分析的是 readBlock 和 writeBlock 两个方法, 而分析这两个方法的关键在于读和写过程中 client 和 dataNode 交互的控制协议。  private void readBlock(DataInputStream in) throws IOException 图 2.2.4(a)是 dataNode 在读数据时接收 client 端发来的读请求协议,该协议 包括了要读取的 Block 的 ID,时间戳,开始偏移和读取的长度,最后是客户端 的名字。根据上面的信息,dataNode 创建一个 BlockSender,如果 BlockSender 没有出错,返回客户端一个确认应答,应答协议如 2.2.4(b)所示,否则,返回错 误码。成功创建 BlockSender 以后,就可以开始通过 BlockSender.sendBlock 发送 数据。BlockSender.sendBlock 底层循环调用 BlockSender.sendChunks,每次发送 一个 packet,packet 由一些列 chunks 组成(这里为什么还要继续分割?),每个 packet 被封装在如 2.2.4(c)所示的包头发送给客户端,客户端根据 offset 和 seqno 等信息重组 block。顺便提一下,2.2.4(c)协议对于写文件时候的传输是通用的, 从而不妨称 2.2.4(c)为数据传输协议,其中: packetLen:包长度,包括包头; offset:偏移量,标志每个 chunk 的其实地址; seqno:包序列号,与 offset 一起用于 packet 的重组; tail:标志是否是最后一个包; len:数据长度; checksum:检验数据,每次发送前,blocksender 都会校验 checksum; data:数据块数据; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 13 ( a )读请求协议 version(short) blockId(long) generationStamp(long) startOffset(long) clientName(String) 81(byte) ( b )读应答协议 ( c )数据传输协议 packetLen(int) offset(long) seqno(long) tail(int) len(int) checksum[] data SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 14 ( d )写请求协议 version(short) blockId(long) generationStamp(long) pipelineSize(int) isRecovery(boolean) hasSrcDataNode(boolean) 80(byte) srcDataNode(DatanodeInfo) 此处是可选的(optional),当 hasSrcDataNode = true 时,此 处有数据 numTargets(int) targets[](DatanodeInfo) Targets[]中共有numTargets个 Datanode信息 checksum.header(byte + int) 最后是写入对象dataChecksum中的 type和bytesPerChecksum变量中 图 2.1.2.4 读写文件相关协议  private void writeBlock(DataInputStream in) throws IOException 我们之前已经不断重复了 hadoop 所实现的 Google FS 的流水式写文件的策 略,这里详细说明下写文件的实现细节。 数据传输协议同 2.2.4(c),图 2.2.4(d)描绘了写请求协议,写请求协议工作于 每个写请求前驱与每个写请求后继之间。整个分析以上图为案例,对于 replication 为 3 的一次备份而言如,client 与第一备份点,直至第二备份点与第三备份点之 间都要通过上述协议交互以建立写文件流水线。对于上述协议,首先是客户端的 版本号和一个字节的操作码,接下来是我们熟悉的 blockId 和 generationStamp。 pipelineSize 是整个数据流链的长度,仍旧以上图为例,pipelineSize=3。 isRecovery 指示这次写操作是否一次恢复操作, isRecovery 来自客户端。client 是客户端的 名字,就是发起请求的节点名,需要特别注意的是,如果是从 NameNode 来的 复制请求,client 为空。 hasSrcDataNode 是一个标志位,如果被设置,表明源节 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 15 点是个 DataNode,接下来读取的数据就是 DataNode 的信息。 numTargets 是目 标节点的数目,包括当前节点,对于当前案例而言,DataNode1 上这个参数值为 3,到了 DataNode3,就只有 1 了。 targets 包含了目标节点的相关信息,根据这 些信息,就可以创建到它们上面的 socket 连接。当然,协议的最后还是为了确保 数据传输完整性的校验头。 收到上述写请求协议的 dataNode 在处理完对数据头的解析之后,会创建一个 BlockReceiver 对象,随后导致下图这些传输流的建立,它们都是建立在 socket 基础上的网络数据流,这些流的作用分别是: in:接收上个节点发送数据的 DataInputStream 对象; relpyOut:给上个节点反馈控制协议的 DataOutputStream 对象; mirrorOut:DataOutputStream 对象,负责向下个节点传数据; mirrorIn:DataInoutStream 对象,接受下个节点反馈控制协议; 图 2.2.2.5 写文件时传输流的建立 如果其中某一个点出错了,那么,出错的节点名会通过 mirrorIn 发送回来, 一直沿着这条链,传播到客户端。如果一切正常,就调用 BlockReceiver. receiveBlock 开始接收数据。receiveBlock 不但完成接收数据并序列化其到当前磁 盘,除了传输量最后一个节点外的其他节点还要完成将收到的数据向下个备份点 转发的工作,数据链上每个节点重复这样的工作,当第一个节点写数据完成以后, 整条链也即完成,接着通过逐级反馈向 client 汇报写数据完成。当然如果中间传 输出现问题,抛出的异常会导致 receiveBlock 关闭相关的输出流,并终止传输。 BlockReceiver.receiveBlock 还会建立一个 PacketResponder 线程来处理应答,在 介绍 PacketResponder 类时会重点介绍处理应答协议,然后就循环调用 BlockReceiver.receivePacket 来处理 packet 的发送,注意这里的 packet 和读文件 时的 chunk 其实是一个概念,是发送和接收数据的最小单位。 BlockReceiver.receivePacket 处理的流程具体参考 BlockReceiver 类的分析。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 16 在整个 block 传输结束以后,调用 DataNode.notifyNamenodeReceivedBlock() 通知 nameNode 块传输结束。  copyBlock 2.1.2.3 BlockSender 类和 BlockReceiver 类 图 2.1.2.5 BlockSender 和 BlockReceiver 类类图  功能 这两个是完成读写文件最重要的对象,分别通过 DataXceiver.readBlcok 和 DataXceiver.writeBlock 方法实例化,并进而调用 BlockSender.sendBlock 以及 BlockReceiver.receiveBlock 来完成文件读写的数据处理工作。  主要方法 这里主要分析两个最底层的函数。  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) throws IOException SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 17 Blocksender.sendBlock 发送完 2.2.4(b)所描绘的读应答协议之后,就不断 调用该函数,整个 block 继续被分割成 packets,每 个 packet 被组织成 2.2.4(c) 的数据传输报文发送给读请求者。 填充数据传输协议头 计算当前发送的数据长度,每 个packet包含的最多chunk数以 及packet总长(包括包头) 数据长度从每个包最多能支持的长度和 endOffset - curOffset中选择小者 这里很奇怪,为什么一个 packet还要继续拆分成chunk 计算数据偏移 checkSumOff + CheckSumLen 检查数据校验和 校验成功 抛出校验异常否 调用out.write传送数据 传送packet结束 图 2.1.2.6 BlockSender.sendChunks 流程图  private int receivePacket() throws IOException receivePacket 完成了流水式地写数据的最重要的工作,事实上,流水式 地写文件原理比较简单,就是通过对前驱节点的数据流接收数据,然后把收 到的数据存储到本地磁盘之前同时向后继节点(如果有)也发一份, SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 18 调用BlockReceiver.receiveNextPacket 获取下一个packet 取得当前packet的协议头 通过mirrorOut将收到 得packet发向下个节点 仍有下个节点 是 检查报文长度 否 < 0,不合法的报文 = 0,空的报文 其他,正常的报文 存储收到的数据到磁盘 通过PacketResponder 反馈结束报文答复 写一个Packet结束 图 2.1.2.7 BlockReceiver.receivePacket 流程图 2.1.2.4 PacketResponder 类 图 2.1.2.8 PacketResponder 类类图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 19  功能 处理流水线中后继节点对于前驱节点的应答工作,以及通过流水线给数据传 输发起者发送应答。在 writeBlock 阶段,该类通过线程方式调用,处理写数 据过程中的应答的发送和接收。注意:每个 packet 一次应答。  主要数据结构 LinkedList ackQueue:用于暂存应答的队列; DataInputStream mirrorIn:用于接收后继节点应答信号的输入流; DataOutputStream replyOut:用于应答前驱节点的输出流; numTargets:包括当前节点在内的后继节点数; BlockReceiver receiver:The owner of the response //TODO What is the purpose  主要方法  synchronized void enqueue(long seqno, boolean lastPacketInBlock) 作用:将 ack 应答放入 ackQueue;  synchronized void close() 作用:等待 ackQueue 中的所有应答都处理完毕之后结束当前应答进程;  private synchronized void lastDataNoderun() 作用:处理最后一个节点的应答;  public void run() 作用:处理所有节点的应答,当流水线达到最后一个节点时调用上面的 lastDataNoderun(); SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 20 2.1.2.5 DataNode 类 下面给出了DataNode 的继承关系,我们发现,DataNode实现了两个通信接 口,其中 ClientDatanodeProtocol 是用于和 Client 交互 的 InterDatanodeProtocol, 就是我们前面提到的 DataNode 间的通信接口。 图 2.1.2.9 DataNode 相关类图 主要数据成员  receivedBlockList 和 delHints receivedBlockList 表明在这个 DataNode 成功创建的新的数据块,而 delHints, 是可以删除该数据块的节点。如在 DataXceiver 的 replaceBlock 中,有调用 datanode.notifyNamenodeReceivedBlock (block, sourceID),这表明,DataNode 已经从 sourceID 上接收了一个 Block,sourceID 上对应的 Block 可以删除了(这 个场景出现在当系统需要做负载均衡时,Block 在 DataNode 之间拷贝)。  ThreadGroup threadGroup SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 21 这是一个线程组,线程组里面包含有所有 DataNode 实际工作的现成,比 如读、写、拷贝线程等,在 DataXceiverServer run 起来的时候,这个线 程组也同时会被创建起来;  long lastBlockReport, lastHeartBeat, heartBeatInterval 三个时间变量,分别汇报最近一次 BlockReport 的时间,最近一次发送 HeartBeat 信号的时间和 HeartBeat 信号发送的间隔;  Server ipcserver ipcServer(类图的左下方)是 DataNode 的一个成员变量,它启动了一 个 RPC 服务,这样,DataNode 就能提供 ClientDatanodeProtocol 和 InterDatanodeProtocol 的能力了。 主要方法  offerService 没有传递参数和返回参数,顾名思义,这是 DataNode 上提供服务的主要 函数,主要完成发送和处理 heartBeat 信号、处理和汇报新接收到得数据 块,发送和处理 blockReprot 信号以及完成 blockScanner。图 2.1.2.10 给 出了 offerService 的流程图。整个流程由于比较庞大,这里分成三个阶段 介绍,分别对应于发送和处理 heartBeat 信号,处理和汇报新数据块,处 理和汇报数据块信息 blockReport 和完成 blockScanner。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 22 写工作日志 While(shouldRun) shouldRun == true 记录当前系统时间startTime startTime – lastHeartbeat >= heartBeatInterval 确定是不是要发 送heartBeat lastHeartbeat = startTime 是 发送heartBeat信号 远程调用 DataNodeProtocol .sendHeatbeat 根据上述远程调用返回的 DataNodeCommand,确定当前 的处理任务 调用 DataNode.proccessCommand OfferService阶段一:发 送和处理heartBeat信号 Stage2 offerService阶段二:向 NameNode汇报最新接收的数据块 检查是否有最新接收 到得数据块 通过检查 receivedBlockList DataNode收到新的数据块, 向NameNode汇报 远程调用 DataNodeProtocol. blockReceived 汇报完毕后,从receivedBlockList和 delHints中对应删除已经处理的数据块记录 Stage 3 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 23 offerService阶段三:发送和处理blockReport信号 Begin Stage 3 startTime - lastBlockReport > blockReportInterval 确定是不是发送 blockReport 获取当前节点上的所有数据块 是 调用 FSDataSetInterface. getBlockReport 发送blockReport信号 远程调用 DataNodeProtocol. blockReport 处理上述远程调用返回的处理命令 调用DataNode. proccessCommand 如果当前没有blockScanner线程 在运行,启动blockScanner 上面所有流程都处理完毕之后,等待下 一次heartBeat信号的到来 循环上述三个阶段的处理 图 2.1.2.10 offerService 流程图  proccessCommand proccessCommand主要处理NameNode给DataNode分派的任务DataNode Command,DataNodeCommand.getAction()是具体的任务行为,具体如下: DataNodeCommand.getAction() 系统调用 DataNode.* 函数功能 transferBlocks 向另一个节点发送 数据块拷贝 data.invalidate 无效化相应数据块 关闭当前节点 向 NameNode 注册 提交系统升级 启动 blockReport SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 24 恢复数据块  transferBlocks 拷贝数据块到其他 DataNode 由 transferBlocks 方法执行。注意,返回的 命令可以包含多个数据块,每一个数据块可以包含多个目标地址。 transferBlocks 方法将为每一个 Block 启动一个 DataTransfer 线程,用于 传输数据。DataTransfer 是一个 DataNode 的内部类,它利用我们前面介 绍的 OP_WRITE_BLOCK 写数据块操作,发送数据到多个目标上面。 2.1.3 Datanode 监视线程功能描述 Datanode 上的线程包括读写数据块时的 Server 线程,如 dataXceiverServer; 用于数据传输的 httpServer、ipcServer;对应于每个 Datanode 实例的 dataNodeThread(主要用于判断该 Datanode 是否工作);周期性检测数据块完整性 的 blockScannerThread 和 Datanode 上一直在工作的 Datanode.run()方法。这里主 要介绍后两者。 线程 blockScannerThread 线程 blockScannerThread 定义在 Datanode.java 中,当一个 Datanode 实例被创 建时,DataBlockScanner 对象会被创建,然后其中的 run()运行,即为 blockScannerThread。该线程的功能是定时对数据块文件进行校验,如果出错, 远程调用 namenode.reportBadBlocks()(正如我们会在 DataNodeProtocol 看到的, 此处的 namenode 就是一个 DataNodeProtocol 协议的实例,NameNode 通过这个 协议提供给 DataNode RPC 调用)处理出错并写日志。需要指出的是,验证数据 块的完整性是利用 BlockSender 发送数据的时候对数据的校验, blockSender.sendBlock(out, null, throttler),将数据块读到一个空的输出设备,如 果有异常,则校验失败。流程如下图所示: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 25 while(datanode.shouldRun && !Thread.interrupted()) DataBlockScanner.run() true 如果有Block,则取出一个 block = blockInfoSet.first().block Thread.sleep(1000) verifyFirstBlock() 如果数据块出错,出错处理 handleScanFailure(block) 是否已到扫 描周期 false verifyBlock() 验证数据块完整性 blockSender.sendBlock(out, null, throttler) 更新扫描状态 updateScanStatus(block, ScanType.VERIFICATION_SCAN, true)) handleScanFailure() 主要工作为 namenode.reportBadBlocks(blocks) 图 2.1.3.1 DataBlockScanner 流程图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 26 Datanode.run() 线程 Datanode.run()会启动 dataXceiverServer 线程,然后执行升级或者正常工 作,一直运行到 Datanode 死亡。如图所示: while(shouldRun) Datanode.run() 正常工作 offerService() 等待线程结束 dataXceiveServer.join() 写日志并关机 shutdown() DataXceiverServer.start() 如果可能执行升级操作 startDistributedUpgradeIfNeeded() 图 2.1.3.2 Datanode 主线程流程图 升级操作暂时不用考虑,这里详细分析一直运行的 offerService()方法,该方 法实现了 Datanode 上的主要工作。offerService()也是该循环,会定时向 Namenode 发送心跳信息,报告本机上 Block 状态的变化并执行 Namenode 返回的命令,然 后检查是否有新收到的数据块,如果有,则发送报告(namenode.blockReceived()); 接下来发送数据块报告(namenode.blockReport())并执行返回的命令;然后启动 blockScanner 线程,所有工作完毕后,等待直到下一个发送心跳的时间点。流程 图如下: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 27 while(shouldRun) Datanode.run(). offerService() No 启动blockScanner线程 blockScannerThread.start() 等待到下一个心跳时间再开始工作 Yes 发送心跳报告 namenode.blockReport() 间隔时间大于心跳间隔发送心跳信息 namenode.sendHeartbeat(); 如果有命令返回,则处理该命令 processCommand(cmd) 是否有新收到的 数据块 向Namenode发送报告 namenode.blockReceived() 辅助性的队列处理工作 receivedBlockList, delHints 执行返回的命令 processCommand(cmd) 图 2.1.3.3 offerService()方法流程图 其中具体的方法如 namenode.sendHeartbeat()、namenode.blockReport()等在 DatanodeProtocol 已经分析,这里不再详述。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 28 2.1.4 Datanode 上主要工作流程 2.1.4.1 DataNode 写数据具体流程 Datanode 接收来自 Client 的数据后会向本机 /tmp 文件夹写数据,数据是按 照数据包接收的,所有每次是将数据包写入,写完毕以后,BlockReceiver 会计 算元数据块(metadata),然后判断数据块的有效性并 finalize。写的时候空间是足 够的,Datanode 不会为数据块先划出一块物理空间来,因此方案中预先确定的放 置冗余块的位置必须填充数据。为了不占用网络带宽,我们讨论决定由本机直接 写入数据(比如全 0)。判断的时间是在数据传输前,数据链建立好以后,这时哪 一个 Datanode 放置冗余已经确定了。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 29 Datanode启动DataXceiverServer.run()写或转 发来自Client或其它Datanode的数据 启动DataXceiver.run()来读/写来自 DataXceiverServer的数据 读数据包头的信息,对于写数据,调用 writeBlock()进行具体操作 文本框的高度和它所关联的边 框线将随文本的增删而增减。 拖动侧边手柄可以更改注释宽 度。 在DataXceiverServer.run()中的 循环中执行 Daemon(threadGroup, new DataXceiver(s)).start() DataXceiver.run()中用 switch...case语句实现了 readBlock/writeBlock/readMeta data/replaceBlock/copyBlock的 程序入口 writeBlock()读取数据包头,建立输入输出 流并构造BlockReceiver对象,由其负责数据 流的写入工作 BlockReceiver对象中的 FSDataset.writeToBlock()负责往本地磁盘上 写数据 writeToBlock()先判断数据块是否是recovery 的,如果不是,则在/tmp中建立数据块并写 入数据 File f = createTmpFile() 数据块写完以后计算元数据块 File metafile = getMetaFile() 最后由BlockReceiver判断数据块的有效性并 finalize BlockReceiver.finalized = FSDataset.isValidBlock() 图 2.1.4.1 Datanode 写文件具体流程图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 30 2.1.4.2 DataNode 上恢复数据处理流程 图 2.1.4.2 Datanode 恢复数据流程图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 31 2.2 NameNode 解析 Hadoop 系统中的 NameNode 是作为系统文件目录的管理者和“Inode”表, 这里 Inode 表的概念和 Linux 系统中的概念是一致的,就是映射文件相应的数据 块信息。同时,NameNode 还负责 DataNode 的管理和维护。 NameNode 作为 HDFS 中文件目录和文件分配的管理者,它保存的最重要信 息,就是下面两个映射: (1)文件名 ----》数据块(InodeFile); (2)数据块 ----》DataNode 列表(方便从 block 找到相应的存储节点); 其中(1)持久化存储在磁盘上,而为了减轻 NameNode 的负担,(2)并不 像(1)一样持久化存储,而是在内存中,通过 DataNode 定时上报建立起来的。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 32 2.2.1 NameNode 接口和协议 图 2.2.1.1 系统中主要的接口和协议 图 2.2.1.1 包含了 NameNode 和 DataNode 往外暴露的接口,其中,DataNode 实现了 InterDatanodeProtocol 和 ClientDatanodeProtocol,剩下的,由 NameNode SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 33 实现,包括了 VersionedProtocol, ClientProtocol, DataNodeProtocol 和 NameNodeProtocol。 2.2.1.1 ClientProtocol ClientProtocol 继承与 VersionedProtocol,提供给客户端用于访问 NameNode。 它包含了文件角度上的 HDFS 功能。和 GFS 一样,HDFS 不提供 POSIX 形式的 接口,而是使用了一个私有接口。 图 2.2.1.2 是 ClientProtocol 的类图,我们会对其中一些重要方法分别做介绍。 图 2.1.1.2 ClientProtocol 类图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 34  getBlockLocations  create  append  setReplication  addBlock SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 35  abandonBlock  complete 这里需要调用 FSNameSystem 的 completeFile 方法,completeFile 需要保 证文件对应的所有的数据块的节点以及其镜像节点都存储成功,否则构造 InodeFileUnderConstruction,返回 STILL_WAITING,等待文件构建完毕。  reportBadBlocks 这里需要调用 FSNameSystem 中的 markBlockAsCorrupt,这个函数是处 理数据块损坏恢复最重要的函数,这里给出其详细流程如图 2.2.1.3 所示: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 36 markBlockAsCorrupt 通过blockId获取存储该 block的相应节点信息 调用 FSNameSystem.getDataNode 节点信息为空 抛出异常是 检查blockMap是否存有相应拷贝 否 通过系统日志记录当前数据块 以及损坏无法恢复否 Block could not be marked as corrupt as it does not exist in blockMap 从blockMap获取当前 block的InodeFile 是 将当前的block及其节点信息添加到 corruptReplicas Map 调用 corruptReplicas.addToCorr uptReplicasMap(blk, node) 当前块对应的备份数超过 系统最大备份数 无效化当前数据块是 调用 FSNameSystem.invalidateBlock (blk, node)将这个block添加到 neededReplication队列 否 调用 FSNameSystem.UpdateNeededRep lications(blk, -1,0) 结束 图 2.2.1.3 markBlockAsCorrupt 流程 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 37  rename  delete  renewLease 2.2.1.2 NameNodeProtocol NameNodeProtocol 很简单,主要用于次级 NameNode 与 NameNode 通 信 ,图 2.2.1.4 给出了该协议的类图,该协议同样继承自 VersionedProtocol 图 2.2.1.4 NameNodeProtocol 类图 在 Namenode 类中,实现了 NameNodeProtocol 协议接口中定义的 getBlocks 方法,如下所示: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 38 给定 DataNodeInfo 类型的 datanode,它是一个描述 Datanode 的状态的实体 类对象,通过 getBlocks 方法,可以获取到总的块大小为 size 的 BlocksWithLocations,即描述这些块的位置信息的 BlocksWithLocations 对象。 2.2.1.3 DataNodeProtocol DatanodeProtocol 主要用于 DataNode 向 NameNode 通信,其中包括 DataNode 向 NameNode 申请注册,DataNode 通过 heartBeat 信号与 NameNode 确认存活信 息以及交互相应的动作。图 2.2.1.5 是 DataNodeProtocol 的类图。 图 2.2.1.5 DataNodeProtocol 类图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 39 下面分别介绍 DataNodeProtocol 中一些最重要的函数,这些函数在今后的详 细设计中将会起到最重要的作用,它们正是构成整个系统动态交互的最重要组成 部分。  register  sendHeartBeat(这恐怕是交互过程中最为重要的方法了) 当然这里最需要关注的是 FSNameSystem 中的 handleHeartBeat 方法,顾名思 义,这个方法主要处理来自 DataNode 的 HeartBeat 信号,其中包含了当前节点的 状态信息,NameNode 需要根据 DataNode 的状态信息执行下一步的动作。 Hadoop 源码里面给出了上述注解,可以看到 NameNode 最主要的执行动作 时确保当前节点的存活(通过设定超时),另外就是调整当前系统状态以为下一 次数据分配做准备(节点总存在空间紧张的问题)。heatBeat 信号还有导致 NameNode 给 DataNode 派活干,每个 DataNode 在 NameNode 上对应一个 DataDescriptor 对象实例,该对象中包含下面三个块队列: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 40 BlockQueue 是一个包含有对列表的类,上述三个 Block Queue 分别记录了将要复制的数据块,等待恢复的数据块和无效的数据块。  replicateBlocks 将要复制的数据块,说明此时该块对应的副本数已经不够,从而 NameNode 通知存放有该块的 DataNode 将 Block 复制到相应的 targets,这个 通知就放在 replicateBlocks,每 次 heartBeat 以后,NameNode 调用 handleHeart Beat 从这个队列找通知,有相应的通知,就转给 DataNode 做相应处理,下 面处理方式类同。  recoverBlocks 等待恢复的数据块,HDFS 会遇到一种情况,就是在写文件的过程中, 数据链因为某种原因断掉了,从而系统中将可能存在“未完成”的数据块, 这些数据块需要在适当的时间里面被“完善”,这就是 recover 的作用。这 里的 recoverBlocks 将配合 ClientDataNodeProtocol 里的 recoverBlock 方法以 及 InterDataNodeProtocol 里的 updateBlock 方法完成这个善后工作。  invalidateBlocks 无效的数据块,当数据块的副本数大于 MaxReplicas 的时候,需要适当 的从相应的 DataNode 无效化一部分数据块,以使副本数得到控制。  blockReport blockReport 底层调用 FSNameSystem 的 processReport 方法,进而调用 DataNodeDescriptor 的 reportDiff,根据 DataNode 汇报的所有 blocks 信息更 新当前节点对应 blockmap(即 block--->DataNode map)。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 41  blockReceived blockReceived 底层调用 FSNameSystem 的 blockReceived 方法,用于 DataNode 向 NameNode 确认收到相应的 block。需要从 pendingReplications 队列删除的原因是可能这是一个恢复操作。  errorReport DataNode 上传错误报告将导致相应的 DataDescriptor 从 NameNode 被删 除,下面是代码片段: heartBeats 是包含目前仍然存活的 DataNode 的 DataNodeDescriptor 队列, 是 datanodeMap 的子集,发送 errorReport 的 DataNode 将被 NameNode 置为 失活。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 42 2.2.2 NameNode 重要数据结构 2.2.2.1 Inode* INode 是一个抽象类,它的两个子类 INodeDirectory 和 INodeFile 分别对应着 目录和文件。INodeDirectory 的子类 INodeDirectoryWithQuota 是带了容量限制的 目录,INodeFile 的子类 INodeFileUnderConstruction 抽象了正在构造的文件,当 创建文件的时候,目录系统会维护对应的信息。下面逐一分析这五个类,类图 如下: 图 2.2.2.1 INode*关系图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 43 INode 成员变量: byte[] name:目录/文件名; INodeDirectory parent:指向了父目录; long modificationTime:最后的修改时间; long permission:访问权限; enum PermissionStatusFormat:存储了访问权限的状态,比如所属的用户、 组等信息。 主要的成员方法: get*():获得文件属于的状态信息,比如属于的用户、组,访问权限等; set*():设置文件属于的状态信息,比如属于的用户、组,访问权限等; collectSubtreeBlocksAndClear():用于收集这个 INode 所有后继中的 Block; computeContentSummary():用于递归计算 INode 包含的一些相关信息,如文 件数,目录数,占用磁盘空间。 INodeDirectory 作用: 抽象了 HDFS 的目录。 成员变量: private List children:该目录下所有目录或文件的集合。 主要成员方法: get*():获得子节点文件、文件目录等; add*():添加子文件、结点等; collectSubtreeBlocksAndClear():实现 INode 的同名方法; computeContentSummary():实现 INode 的同名方法。 INodeDirectoryQuota 作用: 限制了 INodeDirectory 可以使用的空间。 成员变量: private long quota:设定该目录的容量限制; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 44 private long count:设定该目录的大小,count < quota,否则抛出异常。 主要的成员方法: get*():set*():获得/设置以上两个变量的值; void updateNumItemsIntree(long):更新目录树的大小; private static void verifyQuota():检测目录的大小满足容量限制,即 count < quota,否则抛出异常。 INodeFile 作用: 抽象了 HDFS 中的文件。 成员变量: static final FsPermission UMASK:访问控制; protected BlockInfo blocks[]:文件对应的Block列表; protected short blockReplication:数据块的副本数目; protected long preferredBlockSize:数据块大小。 主要的成员方法: get*()、set*():获得/设定数据块(Block)、副本数(blockReplication); void addBlock(BlockInfo newblock):添加数据块到队列(blocks[])末尾; Block getPenultimateBlock():获得倒数第二块数据块; INodeFileUnderConstruction toINodeFileUnderConstruction():将文件引用改 为正在构造的文件。 INodeFileUnderConstruction 作用: 保存正在构造的文件的信息。 成员变量: StringBytesWritable clientName:保存了拥有租约(lease)的节点名; StringBytesWritable clientMachine:构建该文件时的客户端名; DatanodeDescriptor clientNode:如果构建文件的是某个数据结点,则此处 保存该结点的信息; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 45 private int primaryNodeIndex:构建文件时的主结点序号,该主结点拥有文 件的租约(lease),其它结点配合该结点工作; private DatanodeDescriptor[] targets:构建文件的最后一个数据块的所有结 点; private long lastRecoveryTime:最近一次的恢复时间。 主要的成员方法: get*()、set*():获得/设定以上变量; INodeFile convertToInodeFile():将该正在构建的文件转化为 INodeFile; void removeBlock(Block oldblock):从文件的数据块列表中删除数据块,删除 的数据块只能是最后一块; void assignPrimaryDatanode():从 targets[]中按顺序找到第一个活着的结点 作为主结点(PrimaryDatanode)。 2.2.2.2 Block* Block*类包含类 Block、BlockInfo 和 BlocksMap,其中类 Block 包含了数据 块的基本的信息,如 id 号,长度和时间戳;类 BlockInfo 扩展了 Block,保存了 该 Block 归属的 INodeFile 和 DatanodeDescriptor,同时还包括了其前继和后继的 Block;类 BlocksMap 保存了 Block 和它在 Namenode 上的一些相关的信息,主 要是维护了一个 map:Map。下图是这些类的关系。 图 2.2.2.2 Block*关系图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 46 Block 作用: Block 类是一个比较基本的类,存储了数据块的基本信息,代表了文件的数 据块,下图是 Block 的类图。 图 2.2.2.3 Block 类图 主要数据成员: BlockId:每个 block 的唯一 id 号; NumBytes:block 大小; GenerationStamp:生成时间。 主要方法: set*()、get*():设置/获取 Block 的 id、大小和生成时间; public void write(DataOutput out):将数据块属性信息写到输出流 out 中; public void readFields(DataInput in):从输入流 in 中读取属性信息赋值给 block。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 47 BlockInfo 作用: BlocksMap 的内部类,包含了和数据块 Block 有关的详细信息,如所属的文 件 INodeFile,所在的 Datanode 及所在 Datanode 上和该 Block 相邻的 Block 信息。 下图为其中的变量和方法。 图 2.2.2.4 类 BlockInfo 成员变量: inode:数据块所属的文件; triplets:数据块所属的数据结点和该结点上与其相邻的数据块。 主要的成员方法: get*()、set*():获得/设置数据块所属的 inode、Datanode 及相邻的数据块; numNodes():数据块属于的数据结点数目; addNode()、removeNode():添加/删除该数据块属于的某个数据结点; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 48 listInsert()、listRemove():将该数据块的信息添加/删除到指定的结点的数据 块列表中。 BlocksMap 作用: 维护 Block 和其相关信息的 map,BlcokInfo 包含了 block 所属的 Inode 信息 和相应的存储 block 的 dataNodes,通过 < Block, BlockInfo > 表维护所有数据块。 类图如下: 图 2.2.2.5 BlockMap 类图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 49 成员变量: private Map map = new HashMap():这 是 block 与 blockInfo 的对应 map,通过键值对应将 block 键映射到相应的值 BlcokInfo。 主要方法: void removeBlock():从该 Map 中删除数据块并将该数据块从所属的 Datanode 上删除; void removeNode(): boolean addNode(Block b, DatanodeDescriptor node, int replication):向 Block b 中增加一个用于存储的数据节点; boolean contains(Block block, DatanodeDescriptor datanode):判断是否已经存在该 Map 中。 2.2.2.3 Datanode* Datanode*包括类 DatanodeID、DatanodeInfo 和 DatanodeDescriptor,其中 DatanodeID 主要保存了数据结点的名字(hostname:portNumber);DatanodeInfo 包 含了数据结点的状态,包括该结点的容量、已经使用空间、剩余空间、最近升级 时间、结点名字、位置等信息,该类用于 Datanode Protocol、Client Protocol 的通 信过程中;DatanodeDescriptor 包含了和一个数据结点相关的最详尽的信息,包 括可用存储空间,维护的数据列表(replicateBlocks、recoverBlocks、invalidateBlocks 等),需要注意的是, SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 50 。 这些类的继承关系如下图所示: 图 2.2.2.6 Datanode*关系图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 51 DatanodeId 作用: 保存了机器名、存储 id 等信息,以及对这些信息的操作。如下图: 图 2.2.2.7 类 DatanodeID 成员变量: 主要方法: get*()、set*():功能和名字相同,都是对变量的操作; void updateRegInfo(DatanodeID):当结点注册的时候更新属性,注意 storageID 是不更新的。 DatanodeInfo 作用: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 52 DataNodeInfo 描述了 DataNode 的状态信息,包括容量、使用的空间、剩余 的空间、最近一次更新等。类中的方法大多是对属性的操作。 在构造客户端新文件协议头时用到的标志存储节点的 targets 数组就是 DatanodeInfo 类型。类图如下: 图 2.2.2.8 DatanodeInfo 类图 成员变量: protected long capacity:可存储数据容量; protected long dfsUsed:系统使用的空间; protected long remaining:结点剩余空间; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 53 protected long lastUpdate:最近一次更新时间; protected int xceiverCount:工作线程数目,比如读写数据时候会启动 DataXceiver,该变量记录运行的 DataXceiver 数目; protected String location = NetworkTopology.UNRESOLVED:解析该结点所处 的位置(由集群结点所构成的树); protected AdminStates adminState:结点所处的状态,结点状态包括 NORMAL、 DECOMMISSION_INPROGRESS,DECOMMISIONED(Decommision 是将结点撤 出集群时所涉及到的状态,结点撤出机器时会将该结点上的队列、数据块等信息 上报 Namenode,等 Namenode 处理完毕后该结点才可从集群中撤出,注这里撤 出是指由管理员主动撤出,不是意外情况导致的 Datanode 死亡); protected String hostName = null:机器名,默认为空; private int level:结点位于集群的位置树的层数; private Node parent:结点的父结点。 主要的方法: set*()、get*():设置/获取属性值,包括容量、位置、结点名等信息; String dumpDatanode():将打印出的该结点的状态信息格式化(使输出信息规 范); void start/stopDecommission():开始/停止撤出结点,方法实际就是改变了 adminState 的值; public void write(DataOutput out):将 DatanodeInfo 的属性值写入流; public void readFields(DataInput in):从流中读入 DatanodeInfo 的信息。 DatanodeDescriptor 作用: 包含了结点最详尽的信息,该类包含了内部类 BlockTargetPair 和 BlockQueue。其中 BlockTargetPair 保存 Block 和对应的 DatanodeDescriptor 的关 联,BlockQueue 是 BlockTargetPair 的队列。两个内部类都很小,除了成员变量 外没有特别的方法,如下图: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 54 图 2.2.2.9 内部类 BlockTargetPair 和 BlockQueue DatanodeDescriptor 包含了两个 BlockQueue,分别记录了 Datanode 上正在复制的 数据块(replicateBlocks)和 Lease 恢复的数据块(recoverBlocks),一个 Block 集合 (TreeSet),保存 Datanode 上已经失效的数据块(invalidateBlocks)。 DatanodeDescriptor 提供的方法用于操作这些队列和集合,也提供了一些 get*() 和 set*()方法,这些方法用于 Namenode 上生成发送到 Datanode 的命令。 成员变量: private volatile BlockInfo blockList = null:Datanode 上包含的数据队列; protected boolean isAlive = false:标识该 Datanode 是否存活; 主要方法: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 55 图 2.2.2.10 DatanodeDescriptor 方法图 add/removeBlock(BlockInfo):将数据块添加/移除该数据结点,方法不仅将 Block 添加到/移除出 Datanode 的 blockList,也 将 该 Datanode 的信息添加到/移除 出该数据块的 BlockInfo 中。 updateHeartbeat():根据接收到的 Heartbeat 信息更新记录的 Datanode 的信息, 包括容量、已用空间、剩余空间等,更新的为其父类 DatanodeInfo 中的变量; set*()、get*():设置/获得状态信息,功能同名称; get*Command():生成发送于 Datanode 的命令; reportDiff():当 Namenode 收到 Datanode 对所管理的 Block 的状态的汇报的 时,调用该方法找出和 Namenode 的信息差别,供后续处理用。(该方法功能可 再看下); SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 56 readFieldsFromFSEditLog():从 FSEditLog 中恢复 DatanodeDescriptor。 2.2.2.4 FSImage 类 FSImage 保存了某个还原点(Checkpoint)时系统的信息,用于系统启动和 恢复时加载各种映射信息,比如文件名数据块列表。系统使用日志(EditLog) 保存对目录树的修改、映射信息的修改,然后已有的 FSImage 会在恰当的时候(比 如系统启动时)进行更新。类图如下: 图 2.2.2.11 FSImage 类图 成员变量: protected long checkpointTime = -1L; private FSEditLog editLog = null; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 57 private Boolean isUpgradeFinalized = false; private Collection checkpointDirs:从还原点(checkpoint)导入 image 的 目录; volatile private CheckpointStates ckptState = CheckpointStates.START:还原点 状态,如能否被还原; static private final FsPermission FILE_PERM = new FsPermission((short) 0):权 限设置; static private final byte[] PATH_SEPARATOR = INode.string2Bytes(Path.SEPARATOR):FSImage 存储路径; 主要方法: 图 2.2.2.12_1 类 FSImage 方法 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 58 图 2.2.2.12_2 类 FSImage 方法 方法很多,其中 get*()、set*()方法用于获得/设置属性值,doUpgrade()、 doRollback()、doFinalize()等方法用于 FSImage 的升级回滚等操作。 loadFSImage():加载最近一次的 FSImage 并和 edits 整合,作为新的 FSImage, 重要方法; saveFSImage():将 FSImage 的内容保存至文件,重要方法。 其它方法很多,功能只了解了大概,如果需要再详细研究。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 59 2.2.2.5 FSEditLog 类 FSEditLog 记录了 HDFS 的日志,实现的方法比较多,但是功能并不复杂, 就是维护用户作出的操作和系统参数,类图如下: 图 2.2.2.13 类 FSEditLog 主要成员变量: private ArrayList editStreams = null:写日志的流; private FSImage fsimage = null:FSImage 对象,上一节有介绍; private long txid = 0:事件(transaction)计数器; private long synctxid = 0:上一次系统处理的事件 id; private long lastPrintTime:写日志时间; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 60 private long numTransactions:事件数目, private long totalTimeTransactions:处理所有的事件所用的时间。 主要方法: 图 2.2.2.14 类 FSEditLog 方法 左侧的方法比较简单,功能同方法名。需要指出的是 printStatistics(boolean) 每隔 1min 将统计信息写入日志,统计信息主要包括事件数量(numTransactions)、 总时间(totalTimeTransactions)等。右侧的 log*()方法是对日志的操作,rollEditLog() 和 purgeEditLog()作用相对,前者关闭 edits,打开日志到 edits.new,后者则删除 老的 edits 文件,然后把 edits.new 改名为 edits,二者用于更新修改时。下面分析 log*()方法。 logOpenFile():将打开文件时申请 lease 的记录写入日志; logCloseFile():关闭文件时的记录写入日志; logMkDir():创建目录时的记录写入日志; logRename():更改文件名时的记录写入日志; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 61 logSetReplication():设置副本数时的记录写入日志; logSetQuota():设置限额值时的记录写入日志; logClearQuota():清楚限额值时的记录写入日志; logSetPermissions():设置访问权限时的记录写入日志; logSetOwner():设置所有者时的记录写入日志; logDelete():删除文件时的记录写入日志; logGenerationStamp():生成文件时的时间戳的记录写入日志; 以上 log*()方法参数并不完全相同,但是都会调用方法 logEdit(OP_XXX, XXX, … …)将时间写入日志,其中 OP_XXX 就是成员变量中的常量。 2.2.2.6 LocatedBlock 类 图 2.2.2.15 LocatedBlock 类图 功能: LocatedBlock对象是Block和DataNodeInfo[]对象集合,它能很方便地通过 Block,找到对应的存储它的DataNodeInfo。 主要数据成员 Block b:数据块; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 62 DatanodeInfo[] locs:该数据块的所被存储的所有 datanodes 位置信息; Corrupt:是否已经是损坏; 主要方法 setCorrupt(boolean corrupt) 作用:设置该数据块为坏,应该是用于恢复处理; DatanodeInfo[] getLocations() 作用:获取数据块所有 datanode 位置; 2.2.3 Namenode 数据队列分析 该部分分析了 Namenode 上的数据结构和运行的线程,其中数据结构主要包 括 corruptReplicas、recentInvalidateSets、neededReplications、 pendingReplications 等,线程主要包括 HeartbeatMonitor、SafeModeMonitor、 ReplicationMonitor 、LeaseMonitor、PendingReplicationMonitor 等。 2.2.3.1 数据队列功能及维护方法 变量及其功能描述: private long pendingReplicationBlocksCount = 0L // 正在复制的数据块数目 private long underReplicatedBlocksCount = 0L // 需要复制的数据块数目 private long scheduledReplicationBlocksCount = 0L 当前正处理的复制工作数目 CorruptReplicasMap corruptReplicas = new CorruptReplicasMap() 保存损坏的数据块到对应Datanode的关系(), 类图如下。类只有一个成员变量,保存Block到一个DatanodeDescriptor的集合的 映射和这个映射上的一系列操作。 图2.2.3.1 CorrupyMap类图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 63 private Map> recentInvalidateSets = new TreeMap>() 保存了每个结点上无效但还存在的数据块(StorageID --> ArrayList)。 Map> excessReplicateMap = new TreeMap>() 保存了每个结点上有效但需要删除的数据块(StorageID --> TreeSet),这种情况可能发生在一个DataNode故障恢复后,上面的数据 块在系统中副本数太多,需要删除一些数据块。 ArrayList heartbeats = new ArrayList() 保存了目前活着的Datanode,线程HearbeatMonitor会定期检查,删除超时的 Datanode,线程分析见下一部分。 private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks() 需要进行复制的数据块,类图如下。实际上是维护了一个优先级队列,当数据块 只有一个副本时,优先级最高,此时为0。UnderReplicatedBlocks提供一些方法, 对Block进行增加,修改,查找和删除。 图2.2.3.2 UnderReplicatedBlocks类图 private PendingReplicationBlocks pendingReplications正在进行复制的数据块, 类图如下。其中,pendingReplications(Map)中移 除,注意每次只能标记一个 Datanode 是损坏的,移除后才能继续下一个。方法 用同步机制保证了 heartbeats 的一致性。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 67 while(!allAlive) heartbeatCheck() 使用while()检测超时的Datanode,每次 处理heartbeats中第一个超时的Datanode 寻找第一个超时(dead)的Datanode,获 得nodeId removeDatanode(DatanodeDescriptor nodeInfo) 同步heartbeats和 datanodeMap,得到对应于 nodeId的DatanodeDescriptor foundDead = false foundDead = true 写日志 NameNode.stateChangeLog.info() 结束,等待下一个周期 HeartbeatMonitor再次调用该 方法 所得结果为nulll nodeInfo != null && isDatanodeDead(nodeInfo) nodeInfo removeStoredBlock() 更新系统容量信息,如 capacityTotal等,updateStats() 从heartbeats队列中移除该node, heartbeats.remove() isAlive dead resetBlocks() // 重置该删除的node对象 removeFromInvalidates() // 删除recentInvalidateSets中的对应关系 Namenode.stateChangeLog.debug() // 写日志 clusterMap.remove() // 从网络拓扑结构中删除该结点信息 图 2.2.3.5 HeartBeatMonitor.run 流程图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 68 removeStoredBlock()的作用是修改 block-->datanode 的图,执行流程如下图所 示。 图 2.2.3.6 removeStoredBlocks 流程图 ReplicationMonitor replthread 运行 ReplicationMonitor,这个线程的 run()方法比较简单,会定期 调用 computeDatanodeWork()和 processPendingReplications(),周期由 dfs.replication.interval 设定,默认时间为 3000ms。下图为 run()方法的执行流程。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 69 while(fsrunning) ReplicationMonitor.run() computeDatanodeWork() safeMode 检查pendingReplications队列,如果已经有足够的数据块正在进 行复制了,则从neededReplications队列中删除该项并写日志 in not in processPendingReplications() 若返回值为0,执行computeInvalidateWork(),该方法通过 invalidateWorkForOneNode()将数据块从recentInvalidateSets移到 Datanode的invalidateBlocks(TreeSet)中,具体的删除工作 会由该Datanode完成 recentInvalidateSets.remove() dn.addBlocksToBeInvalidated() computeReplicationWork(blocksToProcess) 扫描neededReplications,取出需要复制的项 检查Block属于的文件是否存在,不存在则从neededReplications 队列里删除,然后继续下一项的处理 得到当前数据块副本数numReplicas 选择复制的源Datanode,chooseSourceDatanode(),若为空则退 出对复制项的处理 选择副本存在的目标结点,如果为空,退出对复制项的处理 replicator.chooseTarget() 将该数据块添加到源Datanode的replicatedBlocks队列中去 srcNode.addBlockToBeReplicated() 并在目标结点中记录复制请求 for each dn in targets, dn.incBlocksScheduled() 将复制项从neeededReplications队列中移到 pendingReplications队列中并写日志 neededReplicationsIterator.remove(),pendingReplications.add() NameNode.stateChangeLog.debug() return scheduledReplicationCount 移动复制 项是为了 超时处理 Thread.sleep(replicationRecheckInterval) 图 2.2.3.7 ReplicationMonitor.run 流程图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 70 processPendingReplications()的功能是从 pendingReplications 队列中找到请求 已经超时的数据块,将它们放到 neededReplications 队列中。执行流程比较简单, 如下图: processPendingReplications() 从正在复制的数据块队列中(pendingReplications) 找到已经超时的数据块 pendingReplications.getTimedOutBlocks() 找到数据块后将其从pendingReplications队列中清空 找到超时数据块还存在的数据结点数目,即副本数目 countNodes(timedOutItems[i]) 将该超时数据块添加到neededReplications队列中 neededReplications.add() 图 2.2.3.8 proccessPendingReplications 流程图 ResolutionMonitor 解析数据结点在网络中的位置,并把该 Datanode 的信息添加到 clusterMap 中,便于以后选取结点。大致流程如下: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 71 while(fsRunning) ResolutionMonitor.run() 从resolutionQueue中获得要解析的Datanode datanodes.add(resolutionQueue.take()) 解析出Datanode的路径 rName = dnsToSwitchMapping.resolve() rName = null rName != null 设定每个Datanode所处的机架 并将该结点添加到clusterMap中去 d.setNetworkLocation(m) clusterMap.add(d) 方法返回的是一个 网络路径,包含了 路由信息 设定每个结点处在同样的机架上 rName = add(NetworkTopology.DEFAULT_RACK) 图 2.2.3.9 ResolutionMonitor.run 流程图 SafeModeMonitor 安全模式是这样一种状态,系统处于这个状态时,不接受任何对名字空间的 修改,同时也不会对数据块进行复制或删除操作。Namenode 启动的时候会自动 进入安全模式,同时也可以手工进入(不会自动离开)。系统启动以后,Datanode 会报告目前它拥有的数据块的信息,当系统接收到的 Block 信息到达一定门槛, 同时每个 Block 都有 dfs.replication.min 个副本后,系统等待一段时间后就离开安 全模式。 dfs.safemode.threshold.pct:接受到的 Block 的比例,缺省为 95%,就是说, 必须 Datanode 报告的数据块数目占总数的 95%,才到达门槛; dfs.replication.min:缺省为 1,即每个副本都存在系统中; SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 72 dfs.safemode.extension:等待时间,缺省为 0,单位秒。 下图是 FSNamesystem.SafeModeInfo 的类图: 图 2.2.3.10 SafeModeInfo 类图 threshold,extension 和 safeReplication 保存的是上面说的 3 个参数。Reached 等 于-1 表明安全模式是关闭的,0 表示安全模式打开但是系统还没达到 threshold。 blockTotal 是计算 threshold 时的分母,blockSafe 是分子,lastStatusReport 用于控 制写日志的间隔,方法都是一些简单方法,判断进入或离开 SafeMode 等。 线程 SafeModeMonitor 用于定期检查系统是否能够离开安全模式,系统离开安全 模式后,smmthread 会被重新赋值为 null。下图是 FSNamesystem.SafeModeMonitor.run()方法的执行流程: SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 73 使用while()循环检测,默认时 间为recheckInterval = 1000ms Thread.sleep(recheckInterval) leaveSafeMode(true) FSNamesystem.SafeModeMonitor.run() while(… …) sleep(recheckInterval) can leave safemode 写日志 NameNode.stateChangeLog.info() safeMode.leave(checkForUpgrades) 转换为手动安全模式 checkForUpgrades 处理数据块副本数 processMisReplicatedBlocks() needUpgrade = true false 写日志,离开安全模式 smmthread = null cannot leave safemode 图 2.2.3.11 SafeModeMonitor.run 流程图。 LeaseMonitor 该线程定义于 LeaseManager.Monitor,作用是周期性的检查租约是否过期, SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 74 如果过期,则调用 FSNamesystem.internalReleaseLease()方法删除租约。周期为 2000ms,外部不能更改。下图为 Monitor.run()的执行流程。 while(fsnamesystem.isRunning) LeaseManager.Monitor.run() 获得最早的lease并判断是否超时 Lease oldest = sortedLeases.first() 未超时 超时 获得oldest关联的文件 oldest.getPaths().toArray() Thread.sleep(2000) checkLeases() 从租约管理器LeaseManager中删除此oldest租约 removeLease(oldest, p.getString()) fsnamesystem.internalReleaseLease(oldest, p.getString()) 获得该文件的INodeFile iFile = dir.getFileINode() 写日志 抛出异常信息 iFile 由iFile构造pendingFile并判断和该 pendingFile关联的Block是否为空 超时 null 或者 !iFile.isUnderConstruction() pendingFile. getBlocks() 删除lease,持久化iNode,关闭文 件,检查该文件的数据块的副本数 finalizeINodeFileUnderConstruction(sr c, pendingFile) 为0 继续写文件并延长lease pendingFile.setTargets(targets) pendingFile.assignPrimaryDatanode() leaseManager.renewLease(lease) 为0 图 2.2.3.12 LeaseMonitor.run 流程图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 75 PendingReplicationMonitor 该线程(timerThread)是定义在类 PendingReplicationBlocks 中 (PendingReplicationBlocks.java),在构造其实例 pendingReplications 时启动,作用 是周期性的检查正在复制的数据块 pengdingReplications(Map)中已经超时的数据块,并把它们添加到超时队列 timedOutItems(ArrayList)中去,检查的周期是 min(defaultRecheckInterval, timeout),其 中 defaultRecheckInterval = 5 * 60 * 1000ms,timeout 可以在构造该类 的实例时设定。PendingReplicationBlocks.PendingReplicationMonitor.run()比较简 单,就是周期性的调用 pendingReplicaitonCheck()方法,执行流程如下: while(fsRunning) PendingReplication Monitor.run() 设定检测周期period period = Math.min(defaultRecheckInterval, timeout) 判断pendingReplications中的每一项是否超时 Thread.sleep(period) pendingReplicationCheck() timeout not timeout 向超时队列中添加超时项并将该项移除 timedOutItems.add(block) iter.remove() 图 2.2.3.13 pendingReplicationMonitor.run 流程图 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 76 2.2.4 NameNode 上的主要工作流程代码分析 2.2.4.1 两个重要监视线程 HeartbeatMonitor:监视是否有datanode超时未发送heartbeat,如果有,则从 blocksMap中删除结点和数据块信息,并更新删除的数据块副本数,将数据块放 入队列UnderReplicatedBlocks中,如果副本数不够,会有线程异步处理需要备 份的块。 ReplicationMonitor:不断定时检查是否有block未达到设定的备份数目而需要 备份,如需要备份则把该block加入到该datanode对应的neededReplications队 列中,等待下一次heartbeat时发送备份恢复命令给该datanode。 1) namenode与datanode之间的主要RPC交互 datnode向namenode汇报坏数据块主要函数执行流程: namenode. reportBadBlocks(LocatedBlock[] blocks)//datanode向namenode汇报坏的 数据块blocks时,用RPC调用namenode的该函数 void markBlockAsCorrupt(Block blk, DatanodeInfo dn)//把datanode dn中的数据块 blk标记为坏数据块 corruptReplicas.addToCorruptReplicasMap(blk, node)// 向该datanode 对应的 corruptReplicas队列中增加该坏的数据块blk updateNeededReplications(blk, -1, 0)// 向neededReplication队列中增加该数据块 datnode向namenode汇报最近收到的数据块主要函数执行流程: void namenode.blockReceived(DatanodeRegistration nodeReg, Block blocks[], String delHints[])//接受datanode的调用,对最 近成功复制的数据块进行处理 namesystem.blockReceived(nodeReg, blocks[i], delHints[i])//逐个处理收到的block pendingReplications.remove(block)//从正在复制的队列中删除该block 任务 addStoredBlock(block, node, delHintNode ); boolean added = node.addBlock(storedBlock); // add block to the data-node //更新blocksMap SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 77 datnode向namenode发送heartbeat主要函数执行流程: public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount)//datanode 向 namenode 发送 heartbeat,汇报该节点状况,包括总容量,已经使用的容量和剩余容量等 verifyRequest(nodeReg)//验证该节点 namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining, xceiverCount, xmitsInProgress)//处理该heartbeat cmd = nodeinfo.getReplicationCommand( maxReplicationStreams - xmitsInProgress);// 如果针对该 datanode有需要复制任务的数据块,则获取该命令并返回给datanode去执行 2.2.4.2 NameNode 写数据代码流程 写文件分成两个部分,第一步是 Client 向 NameNode 申请写文件,这个时候 NameNode 上主要完成 InodeFile 表的建立。随后 Client 将每个文件再分成 64M 大小的数据块传输,此时每次数据块传输完毕,都将导致 InodeFile 上增加相应 block 的记录。图 2.2.4.1 和图 2.2.4.2 分别详细描述了这两部分的处理流程和详细 调用的方法。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 78 图 2.2.4.1 写文件 NameNode 处理流程 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 79 图 2.2.4.2 写数据块 NameNode 处理流程 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 80 2.2.4.3 NameNode 恢复数据代码流程 图 2.2.4.3 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 81 图 2.2.4.4 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 82 2.3 Client 写文件解析 2.3.1 Client 主要数据结构 图 2.3.1 Client 端类图 2.3.1.1 OutputStream 类 功能 java.io 中的类,实现了 Flushable 和 Closeable 接口,比较底层的类。 2.3.1.2 FSOutputSummer 类 功能 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 83 引入了 HDFS 写数据时需要的计算校验和的功能,可以把流转换成 Datanode 数据包的格式(为了校验)。 主要方法 int write1(byte b[], int off, int len): private 作用:FSOutputSummer 的 write 方法会调用 write1,write1 中计算校验 和并将用户输入的数据拷贝到缓冲区,缓冲区满了以后会调用 flushBuffer, 最终调用虚方法的 writeChunk。 2.3.1.3 Packet 类 功能 内部类,构造发送的数据包。 主要成员变量 ByteBuffer buffer // 指向缓冲区 buf 的引用 byte[] buf // 存放发送的数据信息 long seqno // 数据块中包的顺序号 int numChunks // 每个数据包中数据片(chunk)的多少 long offsetInBlock // 读数据块的起始位置 … … 主要方法 Packet(int pktSize, int chunkPerPkt, long offsetInBlock) 作用:创建新的数据包 writeData(byte[] inarray, int off, int len) : void 作用:将缓冲区的数据写入数据包中 writeChecksum(byte[] inarray, int off, int len) : void 作用:将缓冲区的校验和数据写入数据包中 getBuffer() : ByteBuffer 作用:用户获得整个数据包,包括包头信息。 2.3.1.4 DataStreamer 类 功能 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 84 内部类,负责写数据包和清理工作 主要方法 run() : void 作用:先调用 processDatanodeError()处理可能的 IO 错误,然后打开连接 传送数据包并维护 dataQueue 和 ackQueue 队列,传输完毕后做清理工作。 2.3.1.5 ResponseProcessor 类 功能 内部类,负责处理应答 主要方法 run() : void 作用:等待 DataNode 的应答,如果是成功应答,删除在 ackQueue 上的 包,如果出错,记录出错的 DataNode,并设置标志位。 2.3.1.6 DFSOutputStream 类 功能 包含三个内部类 Packet、DataStreamer、ResponseProcessor,主要负责 Client 向 Datanode 写数据 主要成员变量 private Socket s // Socket连接 private String src // 文件名 private DataOutputStream blockStream //向Datanode写的数据流 private DataInputStream blockReplyStream //从Datanode返回的数据 private Block block private long blockSize private DataChecksum checksum private LinkedList dataQueue // 要发送的数据包队列 private LinkedList ackQueue // 接收应答信息的队列 private DataStreamer streamer private ResponseProcessor response = null SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 85 private long currentSeqno = 0 // 发送的数据包序号 private int chunksPerPacket = 0 //每个数据包中含有的数据片(chunk)数目 private DatanodeInfo[] nodes = null // 写入数据的结点信息 … … 主要方法 DFSOutputStream(String src, FsPermission masked, Boolean overwrite, short replication, long blockSize, Progressable progress, int buffersize) 作用:构造函数,建立一条向Datanode 写的数据流,调用了 namenode.create(src, masked, clientName, overwrite, replication, blockSize):void 方法,在Namenode的文件空间中建立文件; private nextBlockOutputStream(String client) : DatanodeInfo[] 作用:当Client写一个新的数据块时,该方法调用locateFollowingBlock() 获得LocatedBlock对象,里面包含了数据块和结点位置信息,结点位置放入 DatanodeInfo[],然后调用createBlockOutputStream()和结点列表中的第一个 Datanode建立连接; private locateFollowingBlock() : LocatedBlock 作用:通过namenode.addBlock() 获取一个新的数据块,返回的 LocateBlock对象里面含有该数据块的存储位置信息; private createBlockOutputStream(DatanodeInfo[] nodes, String client, Boolean recoveryFlay) : boolean 作用:打开到第一个DataNode的连接,然后发送数据包,并等待来自 DataNode的回应,记录出错的DataNode在nodes中的位置,设置errorIndex并 返回false; protected synchronized writeChunk(byte[] b, int offset, int len, byte[] checksum) : void 作用:往Packet中写校验值和数据,如果数据包写满则将数据包放入发 送队列dataQueue中,等待DataStreamer发送。 private synchronized flushInternal() : void 作用:等待dataQueue和ackQueue都为空,即数据被DataNode顺利接收 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 86 private synchronized closeInternal() : void 作用:等待所有的数据都写完后结束两个工作线程,关闭socket,最后 调用naenode.complete(),通知Namenode结束一次写操作。 2.3.2 Client 写数据具体流程 Client 写文件时两条线程功能描述: DataStreamer: 1. 调用 processDatanodeError,处理可能的 IO 错误; 2. 在 dataQueue 上等待,当有数据出现在队列上时,连接 Datanode 并启动 ResponseProcessor 线程; 3. 通过 blockStream()发送数据并维护 dataQueue 和 ackQueue; 4. ackQueue 队列为空后的清理工作; ResponseProcessor 等待来自 DataNode 的应答,如果是成功应答,删除在 ackQueue 的包, 如果有错误,记录出错的 DataNode,并设置标志位。 Client Datanode_1 Datanode_2 blockStream.write() in mirrorOut in blockReplyStream replyOut mirrorIn replyOut mirrorOut mirrorIn DataStreamer ResponseProcessor 图 2.3.2 写文件流程示意 图 2.3.3 是具体的 Client 端写文件流程图,包括具体的函数调用关系。 SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 87 客户端写文件 DFSClient.creat() 创建DFSOutputStream对象 调用namenode.creat() 在文件空间中建立文件 启动DataStreamer 设定校验和(checksum)大小、每个数据包中片 (chunk)的数量(chunksPerPacket)大小 (PacketSize)、发送的数据包大小(packetSize) DataStreamer通过 DFSOutputStream.nextBlockOutputStream()获得要写入 数据的Datanode并建立连接,然后启动 ResponseProcessor负责处理Datanode返回的应答 blockStream()从缓冲区获得数据并向 Datanode写 Packet one = dataQueue.getFirst() // 获得要发送的数据包 ByteBuffer buf = one.getBuffer() //从 缓冲区获得数据 nextBlockOutputStream()调用locateFollowingBlock() 获得包含有Datanode信息的LocatedBlock数组,最终 是通过调用namenode.addBlock()来获得 ResponseProcessor等待来自Datanode的应答,对于 成功应答删除在ackQueue队列上的包,出错应答则 记录出错的Datanode 数据包(Packet)在dataQueue上,DataStreamer在 dataQueue上等待,当连接成功并且dataQueue上有数 据时,DataStreamer从缓冲区获得数据并将数据包从 dataQueue移到ackQueue dataQueue.removeFirst(),ackQueue.addLast() creat()定义在ClientProtocol中, 由Namenode具体实现 数据传输完毕后等待ackQueue为空,然 后做清理工作 当数据包为最后一个时, DataStreamer写一个长度为0的包指 示数据传输结束 blockStream.writeInt(0) 清理工作如 response.close()/blockStream.close()/ blockReplyStream.close() 写数据块的具体工作由DFSOutputStream完成 OutputStream result = new DFSOutputStream(src, masked, overwrite,replication, blockSize, progress, buffersize) 图 2.3.3 Client 写数据块流程 客户端通过 DFSClient 类创建文件,该类创建一个输出流 DFSOutputStream 来向 hdfs 写一个文件,代码如下: OutputStream result = new DFSOutputStream(src//文件流名称, masked//权限标 志,Overwrite//是否覆盖, replication//备份次数, blockSize//块大小, progress, buffersize//缓冲区大小,conf.getInt("io.bytes.per.checksum", 512)//校验码); SHANGHAI JIAO TONG UNIVERSITY Project of SUR_ECCS 88 DFSOutputStream 创建之后远程调用 namenode .create (src, masked, clientName, overwrite, replication, blockSize)//在 namenode 位置创建文件; 同时创建 DataStreamer 类来专门负责把文件所有数据写到 datanodes 中 ,每 次写的过程中首先在 private LocatedBlock locateFollowingBlock(long start)。函数 中通过远程调用 namenode 上的 namenode.addBlock(src, clientName)来选定要存 放该 block 的所有 datanodes。 2.3.3 Client 恢复数据具体流程 Client 端不做数据恢复的工作。

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 20 金币 [ 分享文档获得金币 ] 11 人已下载

下载文档

相关文档