source: proiecte/HadoopJUnit/hadoop-0.20.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @ 120

Last change on this file since 120 was 120, checked in by (none), 14 years ago

Added the mail files for the Hadoop JUNit Project

  • Property svn:executable set to *
File size: 29.6 KB
Line 
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.hdfs.server.datanode;
20
21import java.io.BufferedReader;
22import java.io.Closeable;
23import java.io.DataOutputStream;
24import java.io.File;
25import java.io.FileNotFoundException;
26import java.io.FileOutputStream;
27import java.io.FileReader;
28import java.io.IOException;
29import java.io.PrintStream;
30import java.text.DateFormat;
31import java.text.SimpleDateFormat;
32import java.util.Arrays;
33import java.util.Collections;
34import java.util.Date;
35import java.util.HashMap;
36import java.util.Iterator;
37import java.util.Random;
38import java.util.TreeSet;
39import java.util.regex.Matcher;
40import java.util.regex.Pattern;
41
42import javax.servlet.http.HttpServlet;
43import javax.servlet.http.HttpServletRequest;
44import javax.servlet.http.HttpServletResponse;
45
46import org.apache.commons.logging.Log;
47import org.apache.commons.logging.LogFactory;
48import org.apache.hadoop.conf.Configuration;
49import org.apache.hadoop.hdfs.protocol.Block;
50import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
51import org.apache.hadoop.hdfs.protocol.LocatedBlock;
52import org.apache.hadoop.io.IOUtils;
53import org.apache.hadoop.util.StringUtils;
54
55/*
56 * This keeps track of blocks and their last verification times.
57 * Currently it does not modify the metadata for block.
58 */
59
60class DataBlockScanner implements Runnable {
61 
62  public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
63 
64  private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
65  private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
66 
67  static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
68  private static final long ONE_DAY = 24*3600*1000L;
69 
70  static final DateFormat dateFormat = 
71                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
72 
73  static final String verificationLogFile = "dncp_block_verification.log";
74  static final int verficationLogLimit = 5; // * numBlocks.
75
76  private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
77  DataNode datanode;
78  FSDataset dataset;
79 
80  // sorted set
81  TreeSet<BlockScanInfo> blockInfoSet;
82  HashMap<Block, BlockScanInfo> blockMap;
83 
84  long totalScans = 0;
85  long totalVerifications = 0; // includes remote verification by clients.
86  long totalScanErrors = 0;
87  long totalTransientErrors = 0;
88 
89  long currentPeriodStart = System.currentTimeMillis();
90  long bytesLeft = 0; // Bytes to scan in this period
91  long totalBytesToScan = 0;
92 
93  private LogFileHandler verificationLog;
94 
95  Random random = new Random();
96 
97  BlockTransferThrottler throttler = null;
98 
99  private static enum ScanType {
100    REMOTE_READ,           // Verified when a block read by a client etc
101    VERIFICATION_SCAN,     // scanned as part of periodic verfication
102    NONE,
103  }
104 
105  static class BlockScanInfo implements Comparable<BlockScanInfo> {
106    Block block;
107    long lastScanTime = 0;
108    long lastLogTime = 0;
109    ScanType lastScanType = ScanType.NONE; 
110    boolean lastScanOk = true;
111   
112    BlockScanInfo(Block block) {
113      this.block = block;
114    }
115   
116    public int hashCode() {
117      return block.hashCode();
118    }
119   
120    public boolean equals(Object other) {
121      return other instanceof BlockScanInfo &&
122             compareTo((BlockScanInfo)other) == 0;
123    }
124   
125    long getLastScanTime() {
126      return ( lastScanType == ScanType.NONE) ? 0 : lastScanTime;
127    }
128   
129    public int compareTo(BlockScanInfo other) {
130      long t1 = lastScanTime;
131      long t2 = other.lastScanTime;
132      return ( t1 < t2 ) ? -1 : 
133                          (( t1 > t2 ) ? 1 : block.compareTo(other.block)); 
134    }
135  }
136 
137  DataBlockScanner(DataNode datanode, FSDataset dataset, Configuration conf) {
138    this.datanode = datanode;
139    this.dataset = dataset;
140    scanPeriod = conf.getInt("dfs.datanode.scan.period.hours", 0);
141    if ( scanPeriod <= 0 ) {
142      scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
143    }
144    scanPeriod *= 3600 * 1000;
145    // initialized when the scanner thread is started.
146  }
147 
148  private synchronized boolean isInitiliazed() {
149    return throttler != null;
150  }
151 
152  private void updateBytesToScan(long len, long lastScanTime) {
153    // len could be negative when a block is deleted.
154    totalBytesToScan += len;
155    if ( lastScanTime < currentPeriodStart ) {
156      bytesLeft += len;
157    }
158    // Should we change throttler bandwidth every time bytesLeft changes?
159    // not really required.
160  }
161 
162  private synchronized void addBlockInfo(BlockScanInfo info) {
163    boolean added = blockInfoSet.add(info);
164    blockMap.put(info.block, info);
165   
166    if ( added ) {
167      LogFileHandler log = verificationLog;
168      if (log != null) {
169        log.setMaxNumLines(blockMap.size() * verficationLogLimit);
170      }
171      updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
172    }
173  }
174 
175  private synchronized void delBlockInfo(BlockScanInfo info) {
176    boolean exists = blockInfoSet.remove(info);
177    blockMap.remove(info.block);
178    if ( exists ) {
179      LogFileHandler log = verificationLog;
180      if (log != null) {
181        log.setMaxNumLines(blockMap.size() * verficationLogLimit);
182      }
183      updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
184    }
185  }
186 
187  /** Update blockMap by the given LogEntry */
188  private synchronized void updateBlockInfo(LogEntry e) {
189    BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
190   
191    if(info != null && e.verificationTime > 0 && 
192        info.lastScanTime < e.verificationTime) {
193      delBlockInfo(info);
194      info.lastScanTime = e.verificationTime;
195      info.lastScanType = ScanType.VERIFICATION_SCAN;
196      addBlockInfo(info);
197    }
198  }
199
200  private void init() {
201   
202    // get the list of blocks and arrange them in random order
203    Block arr[] = dataset.getBlockReport();
204    Collections.shuffle(Arrays.asList(arr));
205   
206    blockInfoSet = new TreeSet<BlockScanInfo>();
207    blockMap = new HashMap<Block, BlockScanInfo>();
208   
209    long scanTime = -1;
210    for (Block block : arr) {
211      BlockScanInfo info = new BlockScanInfo( block );
212      info.lastScanTime = scanTime--; 
213      //still keep 'info.lastScanType' to NONE.
214      addBlockInfo(info);
215    }
216
217    /* Pick the first directory that has any existing scanner log.
218     * otherwise, pick the first directory.
219     */
220    File dir = null;
221    FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
222    for(FSDataset.FSVolume vol : volumes) {
223      if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
224        dir = vol.getDir();
225        break;
226      }
227    }
228    if (dir == null) {
229      dir = volumes[0].getDir();
230    }
231   
232    try {
233      // max lines will be updated later during initialization.
234      verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
235    } catch (IOException e) {
236      LOG.warn("Could not open verfication log. " +
237               "Verification times are not stored.");
238    }
239   
240    synchronized (this) {
241      throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
242    }
243  }
244
245  private synchronized long getNewBlockScanTime() {
246    /* If there are a lot of blocks, this returns a random time with in
247     * the scan period. Otherwise something sooner.
248     */
249    long period = Math.min(scanPeriod, 
250                           Math.max(blockMap.size(),1) * 600 * 1000L);
251    return System.currentTimeMillis() - scanPeriod + 
252           random.nextInt((int)period);   
253  }
254
255  /** Adds block to list of blocks */
256  synchronized void addBlock(Block block) {
257    if (!isInitiliazed()) {
258      return;
259    }
260   
261    BlockScanInfo info = blockMap.get(block);
262    if ( info != null ) {
263      LOG.warn("Adding an already existing block " + block);
264      delBlockInfo(info);
265    }
266   
267    info = new BlockScanInfo(block);   
268    info.lastScanTime = getNewBlockScanTime();
269   
270    addBlockInfo(info);
271    adjustThrottler();
272  }
273 
274  /** Deletes the block from internal structures */
275  synchronized void deleteBlock(Block block) {
276    if (!isInitiliazed()) {
277      return;
278    }
279    BlockScanInfo info = blockMap.get(block);
280    if ( info != null ) {
281      delBlockInfo(info);
282    }
283  }
284
285  /** @return the last scan time */
286  synchronized long getLastScanTime(Block block) {
287    if (!isInitiliazed()) {
288      return 0;
289    }
290    BlockScanInfo info = blockMap.get(block);
291    return info == null? 0: info.lastScanTime;
292  }
293
294  /** Deletes blocks from internal structures */
295  void deleteBlocks(Block[] blocks) {
296    for ( Block b : blocks ) {
297      deleteBlock(b);
298    }
299  }
300 
301  void verifiedByClient(Block block) {
302    updateScanStatus(block, ScanType.REMOTE_READ, true);
303  }
304 
305  private synchronized void updateScanStatus(Block block, 
306                                             ScanType type,
307                                             boolean scanOk) {
308    BlockScanInfo info = blockMap.get(block);
309   
310    if ( info != null ) {
311      delBlockInfo(info);
312    } else {
313      // It might already be removed. Thats ok, it will be caught next time.
314      info = new BlockScanInfo(block);
315    }
316   
317    long now = System.currentTimeMillis();
318    info.lastScanType = type;
319    info.lastScanTime = now;
320    info.lastScanOk = scanOk;
321    addBlockInfo(info);
322   
323    if (type == ScanType.REMOTE_READ) {
324      totalVerifications++;
325    }
326       
327    // Don't update meta data too often in case of REMOTE_READ
328    // of if the verification failed.
329    long diff = now - info.lastLogTime;
330    if (!scanOk || (type == ScanType.REMOTE_READ &&
331                    diff < scanPeriod/3 && diff < ONE_DAY)) {
332      return;
333    }
334   
335    info.lastLogTime = now;
336    LogFileHandler log = verificationLog;
337    if (log != null) {
338      log.appendLine(LogEntry.newEnry(block, now));
339    }
340  }
341 
342  private void handleScanFailure(Block block) {
343   
344    LOG.info("Reporting bad block " + block + " to namenode.");
345   
346    try {
347      DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
348      LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
349      datanode.namenode.reportBadBlocks(blocks);
350    } catch (IOException e){
351      /* One common reason is that NameNode could be in safe mode.
352       * Should we keep on retrying in that case?
353       */
354      LOG.warn("Failed to report bad block " + block + " to namenode : " +
355               " Exception : " + StringUtils.stringifyException(e));
356    }
357  }
358 
359  static private class LogEntry {
360    long blockId = -1;
361    long verificationTime = -1;
362    long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
363   
364    /**
365     * The format consists of single line with multiple entries. each
366     * entry is in the form : name="value".
367     * This simple text and easily extendable and easily parseable with a
368     * regex.
369     */
370    private static Pattern entryPattern = 
371      Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
372   
373    static String newEnry(Block block, long time) {
374      return "date=\"" + dateFormat.format(new Date(time)) + "\"\t " +
375             "time=\"" + time + "\"\t " +
376             "genstamp=\"" + block.getGenerationStamp() + "\"\t " +
377             "id=\"" + block.getBlockId() +"\"";
378    }
379   
380    static LogEntry parseEntry(String line) {
381      LogEntry entry = new LogEntry();
382     
383      Matcher matcher = entryPattern.matcher(line);
384      while (matcher.find()) {
385        String name = matcher.group(1);
386        String value = matcher.group(2);
387       
388        try {
389          if (name.equals("id")) {
390            entry.blockId = Long.valueOf(value);
391          } else if (name.equals("time")) {
392            entry.verificationTime = Long.valueOf(value);
393          } else if (name.equals("genstamp")) {
394            entry.genStamp = Long.valueOf(value);
395          }
396        } catch(NumberFormatException nfe) {
397          LOG.warn("Cannot parse line: " + line, nfe);
398          return null;
399        }
400      }
401     
402      return entry;
403    }
404  }
405 
406  private synchronized void adjustThrottler() {
407    long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
408    long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
409    throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
410  }
411 
412  private void verifyBlock(Block block) {
413   
414    BlockSender blockSender = null;
415
416    /* In case of failure, attempt to read second time to reduce
417     * transient errors. How do we flush block data from kernel
418     * buffers before the second read?
419     */
420    for (int i=0; i<2; i++) {
421      boolean second = (i > 0);
422     
423      try {
424        adjustThrottler();
425       
426        blockSender = new BlockSender(block, 0, -1, false, 
427                                               false, true, datanode);
428
429        DataOutputStream out = 
430                new DataOutputStream(new IOUtils.NullOutputStream());
431       
432        blockSender.sendBlock(out, null, throttler);
433
434        LOG.info((second ? "Second " : "") +
435                 "Verification succeeded for " + block);
436       
437        if ( second ) {
438          totalTransientErrors++;
439        }
440       
441        updateScanStatus(block, ScanType.VERIFICATION_SCAN, true);
442
443        return;
444      } catch (IOException e) {
445
446        totalScanErrors++;
447        updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
448
449        // If the block does not exists anymore, then its not an error
450        if ( dataset.getFile(block) == null ) {
451          LOG.info("Verification failed for " + block + ". Its ok since " +
452          "it not in datanode dataset anymore.");
453          deleteBlock(block);
454          return;
455        }
456
457        LOG.warn((second ? "Second " : "First ") + 
458                 "Verification failed for " + block + ". Exception : " +
459                 StringUtils.stringifyException(e));
460       
461        if (second) {
462          datanode.getMetrics().blockVerificationFailures.inc(); 
463          handleScanFailure(block);
464          return;
465        } 
466      } finally {
467        IOUtils.closeStream(blockSender);
468        datanode.getMetrics().blocksVerified.inc();
469        totalScans++;
470        totalVerifications++;
471      }
472    }
473  }
474 
475  private synchronized long getEarliestScanTime() {
476    if ( blockInfoSet.size() > 0 ) {
477      return blockInfoSet.first().lastScanTime;
478    }
479    return Long.MAX_VALUE; 
480  }
481 
482  // Picks one block and verifies it
483  private void verifyFirstBlock() {
484    Block block = null;
485    synchronized (this) {
486      if ( blockInfoSet.size() > 0 ) {
487        block = blockInfoSet.first().block;
488      }
489    }
490   
491    if ( block != null ) {
492      verifyBlock(block);
493    }
494  }
495 
496  /** returns false if the process was interrupted
497   * because the thread is marked to exit.
498   */
499  private boolean assignInitialVerificationTimes() {
500    int numBlocks = 1;
501    synchronized (this) {
502      numBlocks = Math.max(blockMap.size(), 1);
503    }
504   
505    //First udpates the last verification times from the log file.
506    LogFileHandler.Reader logReader = null;
507    try {
508      if (verificationLog != null) {
509        logReader = verificationLog.new Reader(false);
510      }
511    } catch (IOException e) {
512      LOG.warn("Could not read previous verification times : " +
513               StringUtils.stringifyException(e));
514    }
515   
516    if (verificationLog != null) {
517      verificationLog.updateCurNumLines();
518    }
519   
520    try {
521    // update verification times from the verificationLog.
522    while (logReader != null && logReader.hasNext()) {
523      if (!datanode.shouldRun || Thread.interrupted()) {
524        return false;
525      }
526      LogEntry entry = LogEntry.parseEntry(logReader.next());
527      if (entry != null) {
528        updateBlockInfo(entry);
529      }
530    }
531    } finally {
532      IOUtils.closeStream(logReader);
533    }
534   
535    /* Initially spread the block reads over half of
536     * MIN_SCAN_PERIOD so that we don't keep scanning the
537     * blocks too quickly when restarted.
538     */
539    long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks,
540                                            10*60*1000 ));
541    long lastScanTime = System.currentTimeMillis() - scanPeriod;
542   
543    /* Before this loop, entries in blockInfoSet that are not
544     * updated above have lastScanTime of <= 0 . Loop until first entry has
545     * lastModificationTime > 0.
546     */   
547    synchronized (this) {
548      if (blockInfoSet.size() > 0 ) {
549        BlockScanInfo info;
550        while ((info =  blockInfoSet.first()).lastScanTime < 0) {
551          delBlockInfo(info);       
552          info.lastScanTime = lastScanTime;
553          lastScanTime += verifyInterval;
554          addBlockInfo(info);
555        }
556      }
557    }
558   
559    return true;
560  }
561 
562  private synchronized void startNewPeriod() {
563    LOG.info("Starting a new period : work left in prev period : " +
564             String.format("%.2f%%", (bytesLeft * 100.0)/totalBytesToScan));
565    // reset the byte counts :
566    bytesLeft = totalBytesToScan;
567    currentPeriodStart = System.currentTimeMillis();
568  }
569 
570  public void run() {
571    try {
572     
573      init();
574     
575      //Read last verification times
576      if (!assignInitialVerificationTimes()) {
577        return;
578      }
579     
580      adjustThrottler();
581     
582      while (datanode.shouldRun && !Thread.interrupted()) {
583        long now = System.currentTimeMillis();
584        synchronized (this) {
585          if ( now >= (currentPeriodStart + scanPeriod)) {
586            startNewPeriod();
587          }
588        }
589        if ( (now - getEarliestScanTime()) >= scanPeriod ) {
590          verifyFirstBlock();
591        } else {
592          try {
593            Thread.sleep(1000);
594          } catch (InterruptedException ignored) {}
595        }
596      }
597    } catch (RuntimeException e) {
598      LOG.warn("RuntimeException during DataBlockScanner.run() : " +
599               StringUtils.stringifyException(e));
600      throw e;
601    } finally {
602      shutdown();
603      LOG.info("Exiting DataBlockScanner thread.");
604    }
605  }
606 
607  synchronized void shutdown() {
608    LogFileHandler log = verificationLog;
609    verificationLog = null;
610    if (log != null) {
611      log.close();
612    }
613  }
614 
615  synchronized void printBlockReport(StringBuilder buffer, 
616                                     boolean summaryOnly) {
617    long oneHour = 3600*1000;
618    long oneDay = 24*oneHour;
619    long oneWeek = 7*oneDay;
620    long fourWeeks = 4*oneWeek;
621   
622    int inOneHour = 0;
623    int inOneDay = 0;
624    int inOneWeek = 0;
625    int inFourWeeks = 0;
626    int inScanPeriod = 0;
627    int neverScanned = 0;
628   
629    int total = blockInfoSet.size();
630   
631    long now = System.currentTimeMillis();
632   
633    Date date = new Date();
634   
635    for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
636      BlockScanInfo info = it.next();
637     
638      long scanTime = info.getLastScanTime();
639      long diff = now - scanTime;
640     
641      if (diff <= oneHour) inOneHour++;
642      if (diff <= oneDay) inOneDay++;
643      if (diff <= oneWeek) inOneWeek++;
644      if (diff <= fourWeeks) inFourWeeks++;
645      if (diff <= scanPeriod) inScanPeriod++;     
646      if (scanTime <= 0) neverScanned++;
647     
648      if (!summaryOnly) {
649        date.setTime(scanTime);
650        String scanType = 
651          (info.lastScanType == ScanType.REMOTE_READ) ? "remote" : 
652            ((info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" :
653              "none");
654        buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
655                                                " scan time : " +
656                                    "%-15d %s\n", info.block, 
657                                    (info.lastScanOk ? "ok" : "failed"),
658                                    scanType, scanTime,
659                                    (scanTime <= 0) ? "not yet verified" : 
660                                      dateFormat.format(date)));
661      }
662    }
663   
664    double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
665                           *100.0/scanPeriod;
666    double pctProgress = (totalBytesToScan == 0) ? 100 :
667                         (totalBytesToScan-bytesLeft)*10000.0/totalBytesToScan/
668                         (100-pctPeriodLeft+1e-10);
669   
670    buffer.append(String.format("\nTotal Blocks                 : %6d" +
671                                "\nVerified in last hour        : %6d" +
672                                "\nVerified in last day         : %6d" +
673                                "\nVerified in last week        : %6d" +
674                                "\nVerified in last four weeks  : %6d" +
675                                "\nVerified in SCAN_PERIOD      : %6d" +
676                                "\nNot yet verified             : %6d" +
677                                "\nVerified since restart       : %6d" +
678                                "\nScans since restart          : %6d" +
679                                "\nScan errors since restart    : %6d" +
680                                "\nTransient scan errors        : %6d" +
681                                "\nCurrent scan rate limit KBps : %6d" +
682                                "\nProgress this period         : %6.0f%%" +
683                                "\nTime left in cur period      : %6.2f%%" +
684                                "\n", 
685                                total, inOneHour, inOneDay, inOneWeek,
686                                inFourWeeks, inScanPeriod, neverScanned,
687                                totalVerifications, totalScans, 
688                                totalScanErrors, totalTransientErrors, 
689                                Math.round(throttler.getBandwidth()/1024.0),
690                                pctProgress, pctPeriodLeft));
691  }
692 
693  /**
694   * This class takes care of log file used to store the last verification
695   * times of the blocks. It rolls the current file when it is too big etc.
696   * If there is an error while writing, it stops updating with an error
697   * message.
698   */
699  private static class LogFileHandler {
700   
701    private static final String curFileSuffix = ".curr";
702    private static final String prevFileSuffix = ".prev";
703   
704    // Don't roll files more often than this
705    private static final long minRollingPeriod = 6 * 3600 * 1000L; // 6 hours
706    private static final long minWarnPeriod = minRollingPeriod;
707    private static final int minLineLimit = 1000;
708   
709   
710    static boolean isFilePresent(File dir, String filePrefix) {
711      return new File(dir, filePrefix + curFileSuffix).exists() ||
712             new File(dir, filePrefix + prevFileSuffix).exists();
713    }
714    private File curFile;
715    private File prevFile;
716   
717    private int maxNumLines = -1; // not very hard limit on number of lines.
718    private int curNumLines = -1;
719   
720    long lastWarningTime = 0;
721   
722    private PrintStream out;
723   
724    int numReaders = 0;
725       
726    /**
727     * Opens the log file for appending.
728     * Note that rolling will happen only after "updateLineCount()" is
729     * called. This is so that line count could be updated in a separate
730     * thread without delaying start up.
731     *
732     * @param dir where the logs files are located.
733     * @param filePrefix prefix of the file.
734     * @param maxNumLines max lines in a file (its a soft limit).
735     * @throws IOException
736     */
737    LogFileHandler(File dir, String filePrefix, int maxNumLines) 
738                                                throws IOException {
739      curFile = new File(dir, filePrefix + curFileSuffix);
740      prevFile = new File(dir, filePrefix + prevFileSuffix);
741      openCurFile();
742      curNumLines = -1;
743      setMaxNumLines(maxNumLines);
744    }
745   
746    // setting takes affect when next entry is added.
747    synchronized void setMaxNumLines(int maxNumLines) {
748      this.maxNumLines = Math.max(maxNumLines, minLineLimit);
749    }
750   
751    /**
752     * Append "\n" + line.
753     * If the log file need to be rolled, it will done after
754     * appending the text.
755     * This does not throw IOException when there is an error while
756     * appending. Currently does not throw an error even if rolling
757     * fails (may be it should?).
758     * return true if append was successful.
759     */
760    synchronized boolean appendLine(String line) {
761      out.println();
762      out.print(line);
763      curNumLines += (curNumLines < 0) ? -1 : 1;
764      try {
765        rollIfRequired();
766      } catch (IOException e) {
767        warn("Rolling failed for " + curFile + " : " + e.getMessage());
768        return false;
769      }
770      return true;
771    }
772   
773    //warns only once in a while
774    synchronized private void warn(String msg) {
775      long now = System.currentTimeMillis();
776      if ((now - lastWarningTime) >= minWarnPeriod) {
777        lastWarningTime = now;
778        LOG.warn(msg);
779      }
780    }
781   
782    private synchronized void openCurFile() throws FileNotFoundException {
783      close();
784      out = new PrintStream(new FileOutputStream(curFile, true));
785    }
786   
787    //This reads the current file and updates the count.
788    void updateCurNumLines() {
789      int count = 0;
790      Reader it = null;
791      try {
792        for(it = new Reader(true); it.hasNext(); count++) {
793          it.next();
794        }
795      } catch (IOException e) {
796       
797      } finally {
798        synchronized (this) {
799          curNumLines = count;
800        }
801        IOUtils.closeStream(it);
802      }
803    }
804   
805    private void rollIfRequired() throws IOException {
806      if (curNumLines < maxNumLines || numReaders > 0) {
807        return;
808      }
809     
810      long now = System.currentTimeMillis();
811      if (now < minRollingPeriod) {
812        return;
813      }
814     
815      if (!prevFile.delete() && prevFile.exists()) {
816        throw new IOException("Could not delete " + prevFile);
817      }
818     
819      close();
820
821      if (!curFile.renameTo(prevFile)) {
822        openCurFile();
823        throw new IOException("Could not rename " + curFile + 
824                              " to " + prevFile);
825      }
826     
827      openCurFile();
828      updateCurNumLines();
829    }
830   
831    synchronized void close() {
832      if (out != null) {
833        out.close();
834        out = null;
835      }
836    }
837   
838    /**
839     * This is used to read the lines in order.
840     * If the data is not read completely (i.e, untill hasNext() returns
841     * false), it needs to be explicitly
842     */
843    private class Reader implements Iterator<String>, Closeable {
844     
845      BufferedReader reader;
846      File file;
847      String line;
848      boolean closed = false;
849     
850      private Reader(boolean skipPrevFile) throws IOException {
851        synchronized (LogFileHandler.this) {
852          numReaders++; 
853        }
854        reader = null;
855        file = (skipPrevFile) ? curFile : prevFile;
856        readNext();       
857      }
858     
859      private boolean openFile() throws IOException {
860
861        for(int i=0; i<2; i++) {
862          if (reader != null || i > 0) {
863            // move to next file
864            file = (file == prevFile) ? curFile : null;
865          }
866          if (file == null) {
867            return false;
868          }
869          if (file.exists()) {
870            break;
871          }
872        }
873       
874        if (reader != null ) {
875          reader.close();
876          reader = null;
877        }
878       
879        reader = new BufferedReader(new FileReader(file));
880        return true;
881      }
882     
883      // read next line if possible.
884      private void readNext() throws IOException {
885        line = null;
886        try {
887          if (reader != null && (line = reader.readLine()) != null) {
888            return;
889          }
890          if (line == null) {
891            // move to the next file.
892            if (openFile()) {
893              readNext();
894            }
895          }
896        } finally {
897          if (!hasNext()) {
898            close();
899          }
900        }
901      }
902     
903      public boolean hasNext() {
904        return line != null;
905      }
906
907      public String next() {
908        String curLine = line;
909        try {
910          readNext();
911        } catch (IOException e) {
912          LOG.info("Could not reade next line in LogHandler : " +
913                   StringUtils.stringifyException(e));
914        }
915        return curLine;
916      }
917
918      public void remove() {
919        throw new RuntimeException("remove() is not supported.");
920      }
921
922      public void close() throws IOException {
923        if (!closed) {
924          try {
925            if (reader != null) {
926              reader.close();
927            }
928          } finally {
929            file = null;
930            reader = null;
931            closed = true;
932            synchronized (LogFileHandler.this) {
933              numReaders--;
934              assert(numReaders >= 0);
935            }
936          }
937        }
938      }
939    }   
940  }
941 
942  public static class Servlet extends HttpServlet {
943   
944    public void doGet(HttpServletRequest request, 
945                      HttpServletResponse response) throws IOException {
946     
947      response.setContentType("text/plain");
948     
949      DataBlockScanner blockScanner = (DataBlockScanner) 
950          getServletContext().getAttribute("datanode.blockScanner");
951     
952      boolean summary = (request.getParameter("listblocks") == null);
953     
954      StringBuilder buffer = new StringBuilder(8*1024);
955      if (blockScanner == null) {
956        buffer.append("Periodic block scanner is not running. " +
957                      "Please check the datanode log if this is unexpected.");
958      } else if (blockScanner.isInitiliazed()) {
959        blockScanner.printBlockReport(buffer, summary);
960      } else {
961        buffer.append("Periodic block scanner is not yet initialized. " +
962                      "Please check back again after some time.");
963      }
964      response.getWriter().write(buffer.toString()); // extra copy!
965    }
966  }
967}
Note: See TracBrowser for help on using the repository browser.