博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop源码分析HDFS ClientProtocol——getBlockLocations
阅读量:6987 次
发布时间:2019-06-27

本文共 4101 字,大约阅读时间需要 13 分钟。

hot3.png

Class ClientProtocol是HDFS客户端与NameNode之间的接口。Client通过调用ClientProtocol的方法完成与NameNode之间的交互。本文分析方法getBlockLocations。该方法的方法声明如下:

public LocatedBlocks getBlockLocations(String src,long offset, long length)throws IOException

该方法用于获取一个给定文件的偏移和数据长度的所有数据库块(block)的数据节点(datanode)。该方法的参数如下:

src:文件的名称

offset:文件的开始位置,文件开始的偏移

length:需要获取文件数据的长度

该方法返回LocatedBlocks,LocatedBlocks包含文件长度、数据块blocks、数据块block的datanode等信息。注意,每一个block的所有datanode按照其距离client的距离进行排序。

下面我们看一下具体是怎么获得文件给定范围内数据的数据块和数据块所在的datanode的。首先,HDFS在内存中包含了整个文件系统的一个映像,包括INodeFile(对应于硬盘上的文件)、INodeDirectory(对应于硬盘上的文件夹)等。可以阅读文件。

为了获取文件的数据块,我们首先需要从文件系统中命名空间中获取代表该文件的INodeFile。通过该INodeFile我们可以获取该文件所包含的所有block。获取INodeFile所有的block之后,我们从中查找包含offset的第一个block。获取到第一个block之后,就可以获取该block所在的datanode。

为了获取block所在的datanode,首先判断该block是不是在文件系统正在构建。如果是正在构建的block,比如当前正在创建该block,正在往block中写入数据。我们可以用INodeFileUnderConstruction.getTargets()来获取该block所在的datanode。因为类INodeFileUnderConstruction保存了最后一个block所存放的datanode。如果不是正在构建的block,我们首先判断该block是不是损坏。如果是损坏的。如果该block没有损坏,我们从blocksMap中获取该block所在的所有DatanodeDescriptor并确定该DatanodeDescriptor存储的block是没有损坏的。

以此类推直到找到所有的block。具体代码如下:

private synchronized LocatedBlocks getBlockLocationsInternal(String src,

long offset, long length, int nrBlocksToReturn,

boolean doAccessTime, boolean needBlockToken) throws IOException {

INodeFile inode = dir.getFileINode(src);//获取INodeFile

if (inode == null) {

return null;

}

if (doAccessTime && isAccessTimeSupported()) {

dir.setTimes(src, inode, -1, now(), false);

}

Block[] blocks = inode.getBlocks();//获取该INodeFile所有的block

if (blocks == null) {

return null;

}

if (blocks.length == 0) {

return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(

blocks.length));

}

List<LocatedBlock> results;

results = new ArrayList<LocatedBlock>(blocks.length);

/**下面的for循环获取offset开始的block**/

int curBlk = 0;

long curPos = 0, blkSize = 0;

int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;

for (curBlk = 0; curBlk < nrBlocks; curBlk++) {

blkSize = blocks[curBlk].getNumBytes();

assert blkSize > 0 : "Block of size 0";

if (curPos + blkSize > offset) {

break;

}

curPos += blkSize;

}

if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file

return null;

long endOff = offset + length;

do {

// get block locations

int numNodes = blocksMap.numNodes(blocks[curBlk]);//block存储备份的数目

int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();//损坏节点  //数目

int numCorruptReplicas = corruptReplicas

.numCorruptReplicas(blocks[curBlk]);//损坏备份存储的数量

if (numCorruptNodes != numCorruptReplicas) {

LOG.warn("Inconsistent number of corrupt replicas for "

+ blocks[curBlk] + "blockMap has " + numCorruptNodes

+ " but corrupt replicas map has " + numCorruptReplicas);

}

DatanodeDescriptor[] machineSet = null;

boolean blockCorrupt = false;

if (inode.isUnderConstruction() && curBlk == blocks.length - 1

&& blocksMap.numNodes(blocks[curBlk]) == 0) {

// get unfinished block locations

//获取正在构建的block的存储位置datanode

INodeFileUnderConstruction cons = (INodeFileUnderConstruction) inode;

machineSet = cons.getTargets();

blockCorrupt = false;

} else {

blockCorrupt = (numCorruptNodes == numNodes);

int numMachineSet = blockCorrupt ? numNodes

: (numNodes - numCorruptNodes);

machineSet = new DatanodeDescriptor[numMachineSet];

if (numMachineSet > 0) {

numNodes = 0;

//从blocksMap中获取该block所有未损坏的datanode的位置

for (Iterator<DatanodeDescriptor> it = blocksMap

.nodeIterator(blocks[curBlk]); it.hasNext();) {

DatanodeDescriptor dn = it.next();

boolean replicaCorrupt = corruptReplicas

.isReplicaCorrupt(blocks[curBlk], dn);

if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))

machineSet[numNodes++] = dn;

}

}

}

LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet,

curPos, blockCorrupt);

if (isAccessTokenEnabled && needBlockToken) {

b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),

EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));

}

results.add(b);

curPos += blocks[curBlk].getNumBytes();

curBlk++;

} while (curPos < endOff && curBlk < blocks.length

&& results.size() < nrBlocksToReturn);

return inode.createLocatedBlocks(results);

}

转载于:https://my.oschina.net/sdzzboy/blog/164141

你可能感兴趣的文章
2016第42周五
查看>>
centos7 取消自动锁屏
查看>>
在IDEA中代码自动提示第一个字母大小写必须匹配的解决
查看>>
面向接口编程的好处和优点
查看>>
架构师必看-架构之美第14章-两个系统的故事:设计之城(一)
查看>>
(原)InsightFace及其mxnet代码
查看>>
我们来翻翻元素样式的族谱-getComputedStyle
查看>>
Hessian HTTP POST访问时,Nginx返回411问题
查看>>
Exif图片方向的一些发现
查看>>
iOS关联对象
查看>>
iOS之传值
查看>>
探索webpack热更新对代码打包结果的影响(二)
查看>>
pandas 修改 DataFrame 列名
查看>>
《2018年云上挖矿态势分析报告》发布,非Web类应用安全风险需重点关注
查看>>
leetcode409.Longest Palindrome
查看>>
Nervos 双周报第 3 期:佛系新年之后的开工大吉!
查看>>
【PHP 扩展开发】Zephir 基础篇
查看>>
字节跳动开源Go结构体标签表达式解释器,成请求参数校验的杀手锏
查看>>
怎么将在线录制的视频转为GIF动态图
查看>>
【剑指offer】顺时针打印矩阵
查看>>