博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop HDFS源码分析 读取命名空间镜像和编辑日志数据
阅读量:5305 次
发布时间:2019-06-14

本文共 12532 字,大约阅读时间需要 41 分钟。

读取命名空间镜像和编辑日志数据

1.读取命名空间镜像

类FSImage是 命名空间镜像的java实现,在源码中,英文注释为,

/** * FSImage handles checkpointing and logging of the namespace edits. * */

FSImage.loadFSImage(FSNamesystem, StartupOption, MetaRecoveryContext) 读取命名空间镜像。

1     private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,  2                                 MetaRecoveryContext recovery)  3             throws IOException {  4         final boolean rollingRollback  5                 = RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);  6         final EnumSet
nnfs; 7 if (rollingRollback) { 8 // if it is rollback of rolling upgrade, only load from the rollback image 9 nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK); 10 } else { 11 // otherwise we can load from both IMAGE and IMAGE_ROLLBACK 12 nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK); 13 } 14 final FSImageStorageInspector inspector = storage 15 .readAndInspectDirs(nnfs, startOpt); 16 17 isUpgradeFinalized = inspector.isUpgradeFinalized(); 18 List
imageFiles = inspector.getLatestImages(); 19 20 StartupProgress prog = NameNode.getStartupProgress(); 21 prog.beginPhase(Phase.LOADING_FSIMAGE); 22 File phaseFile = imageFiles.get(0).getFile(); 23 prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath()); 24 prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length()); 25 boolean needToSave = inspector.needToSave(); 26 27 Iterable
editStreams = null; 28 29 initEditLog(startOpt); 30 31 if (NameNodeLayoutVersion.supports( 32 LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { 33 // If we're open for write, we're either non-HA or we're the active NN, so 34 // we better be able to load all the edits. If we're the standby NN, it's 35 // OK to not be able to read all of edits right now. 36 // In the meanwhile, for HA upgrade, we will still write editlog thus need 37 // this toAtLeastTxId to be set to the max-seen txid 38 // For rollback in rolling upgrade, we need to set the toAtLeastTxId to 39 // the txid right before the upgrade marker. 40 long toAtLeastTxId = editLog.isOpenForWrite() ? inspector 41 .getMaxSeenTxId() : 0; 42 if (rollingRollback) { 43 // note that the first image in imageFiles is the special checkpoint 44 // for the rolling upgrade 45 toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2; 46 } 47 editStreams = editLog.selectInputStreams( 48 imageFiles.get(0).getCheckpointTxId() + 1, 49 toAtLeastTxId, recovery, false); 50 } else { 51 editStreams = FSImagePreTransactionalStorageInspector 52 .getEditLogStreams(storage); 53 } 54 int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY, 55 DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT); 56 for (EditLogInputStream elis : editStreams) { 57 elis.setMaxOpSize(maxOpSize); 58 } 59 60 for (EditLogInputStream l : editStreams) { 61 LOG.debug("Planning to load edit log stream: " + l); 62 } 63 if (!editStreams.iterator().hasNext()) { 64 LOG.info("No edit log streams selected."); 65 } 66 67 FSImageFile imageFile = null; 68 for (int i = 0; i < imageFiles.size(); i++) { 69 try { 70 imageFile = imageFiles.get(i); 71 loadFSImageFile(target, recovery, imageFile, startOpt); 72 break; 73 } catch (IOException ioe) { 74 LOG.error("Failed to load image from " + imageFile, ioe); 75 target.clear(); 76 imageFile = null; 77 } 78 } 79 // Failed to load any images, error out 80 if (imageFile == null) { 81 FSEditLog.closeAllStreams(editStreams); 82 throw new IOException("Failed to load an FSImage file!"); 83 } 84 prog.endPhase(Phase.LOADING_FSIMAGE); 85 86 if (!rollingRollback) { 87 long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery); 88 needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), 89 txnsAdvanced); 90 if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) { 91 // rename rollback image if it is downgrade 92 renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE); 93 } 94 } else { 95 // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals 96 // to the last txid in rollback fsimage. 97 rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId()); 98 needToSave = false; 99 }100 editLog.setNextTxId(lastAppliedTxId + 1);101 return needToSave;102 }

上面的代码中,for循环语句包含的代码用于读入文件的信息。

2.读取编辑日志

读取命名空间镜像后,内存中的名字节点只包含了保存镜像的那一个时刻的内容,还需要读取编辑日志中的内容才能恢复数据。

目前 hdfs 通过 FSEditLogLoader类 读取编辑日志。读取的代码如下。

1 long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,  2       StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {  3     StartupProgress prog = NameNode.getStartupProgress();  4     Step step = createStartupProgressStep(edits);  5     prog.beginStep(Phase.LOADING_EDITS, step);  6     fsNamesys.writeLock();  7     try {  8       long startTime = now();  9       FSImage.LOG.info("Start loading edits file " + edits.getName()); 10       long numEdits = loadEditRecords(edits, false, expectedStartingTxId, 11           startOpt, recovery); 12       FSImage.LOG.info("Edits file " + edits.getName()  13           + " of size " + edits.length() + " edits # " + numEdits  14           + " loaded in " + (now()-startTime)/1000 + " seconds"); 15       return numEdits; 16     } finally { 17       edits.close(); 18       fsNamesys.writeUnlock(); 19       prog.endStep(Phase.LOADING_EDITS, step); 20     } 21   } 22  23   long loadEditRecords(EditLogInputStream in, boolean closeOnExit, 24       long expectedStartingTxId, StartupOption startOpt, 25       MetaRecoveryContext recovery) throws IOException { 26     FSDirectory fsDir = fsNamesys.dir; 27  28     EnumMap
> opCounts = 29 new EnumMap
>(FSEditLogOpCodes.class); 30 31 if (LOG.isTraceEnabled()) { 32 LOG.trace("Acquiring write lock to replay edit log"); 33 } 34 35 fsNamesys.writeLock(); 36 fsDir.writeLock(); 37 38 long recentOpcodeOffsets[] = new long[4]; 39 Arrays.fill(recentOpcodeOffsets, -1); 40 41 long expectedTxId = expectedStartingTxId; 42 long numEdits = 0; 43 long lastTxId = in.getLastTxId(); 44 long numTxns = (lastTxId - expectedStartingTxId) + 1; 45 StartupProgress prog = NameNode.getStartupProgress(); 46 Step step = createStartupProgressStep(in); 47 prog.setTotal(Phase.LOADING_EDITS, step, numTxns); 48 Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); 49 long lastLogTime = now(); 50 long lastInodeId = fsNamesys.getLastInodeId(); 51 52 try { 53 while (true) { 54 try { 55 FSEditLogOp op; 56 try { 57 op = in.readOp(); 58 if (op == null) { 59 break; 60 } 61 } catch (Throwable e) { 62 // Handle a problem with our input 63 check203UpgradeFailure(in.getVersion(true), e); 64 String errorMessage = 65 formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId); 66 FSImage.LOG.error(errorMessage, e); 67 if (recovery == null) { 68 // We will only try to skip over problematic opcodes when in 69 // recovery mode. 70 throw new EditLogInputException(errorMessage, e, numEdits); 71 } 72 MetaRecoveryContext.editLogLoaderPrompt( 73 "We failed to read txId " + expectedTxId, 74 recovery, "skipping the bad section in the log"); 75 in.resync(); 76 continue; 77 } 78 recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = 79 in.getPosition(); 80 if (op.hasTransactionId()) { 81 if (op.getTransactionId() > expectedTxId) { 82 MetaRecoveryContext.editLogLoaderPrompt("There appears " + 83 "to be a gap in the edit log. We expected txid " + 84 expectedTxId + ", but got txid " + 85 op.getTransactionId() + ".", recovery, "ignoring missing " + 86 " transaction IDs"); 87 } else if (op.getTransactionId() < expectedTxId) { 88 MetaRecoveryContext.editLogLoaderPrompt("There appears " + 89 "to be an out-of-order edit in the edit log. We " + 90 "expected txid " + expectedTxId + ", but got txid " + 91 op.getTransactionId() + ".", recovery, 92 "skipping the out-of-order edit"); 93 continue; 94 } 95 } 96 try { 97 if (LOG.isTraceEnabled()) { 98 LOG.trace("op=" + op + ", startOpt=" + startOpt 99 + ", numEdits=" + numEdits + ", totalEdits=" + totalEdits);100 }101 long inodeId = applyEditLogOp(op, fsDir, startOpt,102 in.getVersion(true), lastInodeId);103 if (lastInodeId < inodeId) {104 lastInodeId = inodeId;105 }106 } catch (RollingUpgradeOp.RollbackException e) {107 throw e;108 } catch (Throwable e) {109 LOG.error("Encountered exception on operation " + op, e);110 if (recovery == null) {111 throw e instanceof IOException? (IOException)e: new IOException(e);112 }113 114 MetaRecoveryContext.editLogLoaderPrompt("Failed to " +115 "apply edit log operation " + op + ": error " +116 e.getMessage(), recovery, "applying edits");117 }118 // Now that the operation has been successfully decoded and119 // applied, update our bookkeeping.120 incrOpCount(op.opCode, opCounts, step, counter);121 if (op.hasTransactionId()) {122 lastAppliedTxId = op.getTransactionId();123 expectedTxId = lastAppliedTxId + 1;124 } else {125 expectedTxId = lastAppliedTxId = expectedStartingTxId;126 }127 // log progress128 if (op.hasTransactionId()) {129 long now = now();130 if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {131 long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;132 int percent = Math.round((float) deltaTxId / numTxns * 100);133 LOG.info("replaying edit log: " + deltaTxId + "/" + numTxns134 + " transactions completed. (" + percent + "%)");135 lastLogTime = now;136 }137 }138 numEdits++;139 totalEdits++;140 } catch (RollingUpgradeOp.RollbackException e) {141 LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");142 break;143 } catch (MetaRecoveryContext.RequestStopException e) {144 MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +145 in.getPosition() + "/" + in.length());146 break;147 }148 }149 } finally {150 fsNamesys.resetLastInodeId(lastInodeId);151 if(closeOnExit) {152 in.close();153 }154 fsDir.writeUnlock();155 fsNamesys.writeUnlock();156 157 if (LOG.isTraceEnabled()) {158 LOG.trace("replaying edit log finished");159 }160 161 if (FSImage.LOG.isDebugEnabled()) {162 dumpOpCounts(opCounts);163 }164 }165 return numEdits;166 }167

 

在上述代码while循环中,每次读取EditLogInputStream流中的一个FSEditLogOp。

转载于:https://www.cnblogs.com/birdhack/p/4297577.html

你可能感兴趣的文章
mysql foreign key <转>
查看>>
Objective-C对象初始化
查看>>
靠谱验证
查看>>
select 选中是否包含
查看>>
与非java语言使用RSA加解密遇到的问题:algid parse error, not a sequence
查看>>
placeholder 兼容 IE
查看>>
网上的浏览痕迹是如何被泄露的
查看>>
Solr4:利用Filter实现两组关键词组合查询
查看>>
Mysql学习实践---SELECT INTO的替代方案
查看>>
【Python】Python—判断变量的基本类型
查看>>
UITableView cell 半透明效果,改变cell高度时背景不闪的解决方法
查看>>
Linux中实现文本过滤
查看>>
TFS2010 服务器名称变更bug
查看>>
Java IO设计模式彻底分析 (转载)
查看>>
用css实现自适应正方形
查看>>
Beta阶段项目总结
查看>>
Remove Duplicates from Sorted Array II
查看>>
Find Peak Element
查看>>
C# MD5一句话加密
查看>>
C#网络编程系列文章之Socket实现异步TCP服务器
查看>>