Class ClientProtocol是HDFS客户端与NameNode之间的接口。Client通过调用ClientProtocol的方法完成与NameNode之间的交互。本文分析方法getBlockLocations。该方法的方法声明如下:
public LocatedBlocks getBlockLocations(String src,long offset, long length)throws IOException
该方法用于获取一个给定文件的偏移和数据长度的所有数据库块(block)的数据节点(datanode)。该方法的参数如下:
src:文件的名称
offset:文件的开始位置,文件开始的偏移
length:需要获取文件数据的长度
该方法返回LocatedBlocks,LocatedBlocks包含文件长度、数据块blocks、数据块block的datanode等信息。注意,每一个block的所有datanode按照其距离client的距离进行排序。
下面我们看一下具体是怎么获得文件给定范围内数据的数据块和数据块所在的datanode的。首先,HDFS在内存中包含了整个文件系统的一个映像,包括INodeFile(对应于硬盘上的文件)、INodeDirectory(对应于硬盘上的文件夹)等。可以阅读文件。
为了获取文件的数据块,我们首先需要从文件系统中命名空间中获取代表该文件的INodeFile。通过该INodeFile我们可以获取该文件所包含的所有block。获取INodeFile所有的block之后,我们从中查找包含offset的第一个block。获取到第一个block之后,就可以获取该block所在的datanode。
为了获取block所在的datanode,首先判断该block是不是在文件系统正在构建。如果是正在构建的block,比如当前正在创建该block,正在往block中写入数据。我们可以用INodeFileUnderConstruction.getTargets()来获取该block所在的datanode。因为类INodeFileUnderConstruction保存了最后一个block所存放的datanode。如果不是正在构建的block,我们首先判断该block是不是损坏。如果是损坏的。如果该block没有损坏,我们从blocksMap中获取该block所在的所有DatanodeDescriptor并确定该DatanodeDescriptor存储的block是没有损坏的。
以此类推直到找到所有的block。具体代码如下:
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
long offset, long length, int nrBlocksToReturn,
boolean doAccessTime, boolean needBlockToken) throws IOException {
INodeFile inode = dir.getFileINode(src);//获取INodeFile
if (inode == null) {
return null;
}
if (doAccessTime && isAccessTimeSupported()) {
dir.setTimes(src, inode, -1, now(), false);
}
Block[] blocks = inode.getBlocks();//获取该INodeFile所有的block
if (blocks == null) {
return null;
}
if (blocks.length == 0) {
return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(
blocks.length));
}
List<LocatedBlock> results;
results = new ArrayList<LocatedBlock>(blocks.length);
/**下面的for循环获取offset开始的block**/
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
blkSize = blocks[curBlk].getNumBytes();
assert blkSize > 0 : "Block of size 0";
if (curPos + blkSize > offset) {
break;
}
curPos += blkSize;
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
return null;
long endOff = offset + length;
do {
// get block locations
int numNodes = blocksMap.numNodes(blocks[curBlk]);//block存储备份的数目
int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();//损坏节点 //数目
int numCorruptReplicas = corruptReplicas
.numCorruptReplicas(blocks[curBlk]);//损坏备份存储的数量
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for "
+ blocks[curBlk] + "blockMap has " + numCorruptNodes
+ " but corrupt replicas map has " + numCorruptReplicas);
}
DatanodeDescriptor[] machineSet = null;
boolean blockCorrupt = false;
if (inode.isUnderConstruction() && curBlk == blocks.length - 1
&& blocksMap.numNodes(blocks[curBlk]) == 0) {
// get unfinished block locations
//获取正在构建的block的存储位置datanode
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) inode;
machineSet = cons.getTargets();
blockCorrupt = false;
} else {
blockCorrupt = (numCorruptNodes == numNodes);
int numMachineSet = blockCorrupt ? numNodes
: (numNodes - numCorruptNodes);
machineSet = new DatanodeDescriptor[numMachineSet];
if (numMachineSet > 0) {
numNodes = 0;
//从blocksMap中获取该block所有未损坏的datanode的位置
for (Iterator<DatanodeDescriptor> it = blocksMap
.nodeIterator(blocks[curBlk]); it.hasNext();) {
DatanodeDescriptor dn = it.next();
boolean replicaCorrupt = corruptReplicas
.isReplicaCorrupt(blocks[curBlk], dn);
if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
machineSet[numNodes++] = dn;
}
}
}
LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet,
curPos, blockCorrupt);
if (isAccessTokenEnabled && needBlockToken) {
b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
}
results.add(b);
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff && curBlk < blocks.length
&& results.size() < nrBlocksToReturn);
return inode.createLocatedBlocks(results);
}