/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.BlockSender;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VolumeScanner
extends Thread {
    public static final Logger LOG = LoggerFactory.getLogger(VolumeScanner.class);
    private static final int SECONDS_PER_MINUTE = 60;
    private static final int MINUTES_PER_HOUR = 60;
    private static final String BLOCK_ITERATOR_NAME = "scanner";
    private final BlockScanner.Conf conf;
    private final DataNode datanode;
    private final FsVolumeReference ref;
    final FsVolumeSpi volume;
    private final long[] scannedBytes = new long[60];
    private long scannedBytesSum = 0L;
    private final DataTransferThrottler throttler = new DataTransferThrottler(1L);
    private final DataOutputStream nullStream = new DataOutputStream((OutputStream)new IOUtils.NullOutputStream());
    private final List<FsVolumeSpi.BlockIterator> blockIters = new LinkedList<FsVolumeSpi.BlockIterator>();
    private final LinkedHashSet<ExtendedBlock> suspectBlocks = new LinkedHashSet();
    private final Cache<ExtendedBlock, Boolean> recentSuspectBlocks = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterAccess(10L, TimeUnit.MINUTES).build();
    private FsVolumeSpi.BlockIterator curBlockIter = null;
    private boolean stopping = false;
    private long startMinute = 0L;
    private long curMinute = 0L;
    private final ScanResultHandler resultHandler;
    private final Statistics stats = new Statistics();

    private static double positiveMsToHours(long ms) {
        if (ms <= 0L) {
            return 0.0;
        }
        return TimeUnit.HOURS.convert(ms, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void printStats(StringBuilder p) {
        p.append("Block scanner information for volume " + this.volume.getStorageID() + " with base path " + this.volume.getBasePath() + "%n");
        Statistics statistics = this.stats;
        synchronized (statistics) {
            p.append(String.format("Bytes verified in last hour       : %57d%n", this.stats.bytesScannedInPastHour));
            p.append(String.format("Blocks scanned in current period  : %57d%n", this.stats.blocksScannedInCurrentPeriod));
            p.append(String.format("Blocks scanned since restart      : %57d%n", this.stats.blocksScannedSinceRestart));
            p.append(String.format("Block pool scans since restart    : %57d%n", this.stats.scansSinceRestart));
            p.append(String.format("Block scan errors since restart   : %57d%n", this.stats.scanErrorsSinceRestart));
            if (this.stats.nextBlockPoolScanStartMs > 0L) {
                p.append(String.format("Hours until next block pool scan  : %57.3f%n", VolumeScanner.positiveMsToHours(this.stats.nextBlockPoolScanStartMs - Time.monotonicNow())));
            }
            if (this.stats.blockPoolPeriodEndsMs > 0L) {
                p.append(String.format("Hours until possible pool rescan  : %57.3f%n", VolumeScanner.positiveMsToHours(this.stats.blockPoolPeriodEndsMs - Time.now())));
            }
            p.append(String.format("Last block scanned                : %57s%n", this.stats.lastBlockScanned == null ? "none" : this.stats.lastBlockScanned.toString()));
            p.append(String.format("More blocks to scan in period     : %57s%n", !this.stats.eof));
            p.append("%n");
        }
    }

    VolumeScanner(BlockScanner.Conf conf, DataNode datanode, FsVolumeReference ref) {
        ScanResultHandler handler;
        this.conf = conf;
        this.datanode = datanode;
        this.ref = ref;
        this.volume = ref.getVolume();
        try {
            handler = conf.resultHandler.newInstance();
        }
        catch (Throwable e) {
            LOG.error("unable to instantiate {}", conf.resultHandler, (Object)e);
            handler = new ScanResultHandler();
        }
        this.resultHandler = handler;
        this.setName("VolumeScannerThread(" + this.volume.getBasePath() + ")");
        this.setDaemon(true);
    }

    private void saveBlockIterator(FsVolumeSpi.BlockIterator iter) {
        try {
            iter.save();
        }
        catch (IOException e) {
            LOG.warn("{}: error saving {}.", new Object[]{this, iter, e});
        }
    }

    private void expireOldScannedBytesRecords(long monotonicMs) {
        long newMinute = TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS);
        if (this.curMinute == newMinute) {
            return;
        }
        for (long m = this.curMinute + 1L; m <= newMinute; ++m) {
            int slotIdx = (int)(m % 60L);
            LOG.trace("{}: updateScannedBytes is zeroing out slotIdx {}.  curMinute = {}; newMinute = {}", new Object[]{this, slotIdx, this.curMinute, newMinute});
            this.scannedBytesSum -= this.scannedBytes[slotIdx];
            this.scannedBytes[slotIdx] = 0L;
        }
        this.curMinute = newMinute;
    }

    private synchronized long findNextUsableBlockIter() {
        int curIdx;
        int numBlockIters = this.blockIters.size();
        if (numBlockIters == 0) {
            LOG.debug("{}: no block pools are registered.", (Object)this);
            return Long.MAX_VALUE;
        }
        if (this.curBlockIter == null) {
            curIdx = 0;
        } else {
            curIdx = this.blockIters.indexOf(this.curBlockIter);
            Preconditions.checkState((curIdx >= 0 ? 1 : 0) != 0);
        }
        long nowMs = Time.now();
        long minTimeoutMs = Long.MAX_VALUE;
        for (int i = 0; i < numBlockIters; ++i) {
            int idx = (curIdx + i + 1) % numBlockIters;
            FsVolumeSpi.BlockIterator iter = this.blockIters.get(idx);
            if (!iter.atEnd()) {
                LOG.info("Now scanning bpid {} on volume {}", (Object)iter.getBlockPoolId(), (Object)this.volume.getBasePath());
                this.curBlockIter = iter;
                return 0L;
            }
            long iterStartMs = iter.getIterStartMs();
            long waitMs = iterStartMs + this.conf.scanPeriodMs - nowMs;
            if (waitMs <= 0L) {
                iter.rewind();
                LOG.info("Now rescanning bpid {} on volume {}, after more than {} hour(s)", new Object[]{iter.getBlockPoolId(), this.volume.getBasePath(), TimeUnit.HOURS.convert(this.conf.scanPeriodMs, TimeUnit.MILLISECONDS)});
                this.curBlockIter = iter;
                return 0L;
            }
            minTimeoutMs = Math.min(minTimeoutMs, waitMs);
        }
        LOG.info("{}: no suitable block pools found to scan.  Waiting {} ms.", (Object)this, (Object)minTimeoutMs);
        return minTimeoutMs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private long scanBlock(ExtendedBlock cblock, long bytesPerSec) {
        long l;
        ExtendedBlock block = null;
        try {
            Block b = this.volume.getDataset().getStoredBlock(cblock.getBlockPoolId(), cblock.getBlockId());
            if (b == null) {
                LOG.info("FileNotFound while finding block {} on volume {}", (Object)cblock, (Object)this.volume.getBasePath());
            } else {
                block = new ExtendedBlock(cblock.getBlockPoolId(), b);
            }
        }
        catch (FileNotFoundException e) {
            LOG.info("FileNotFoundException while finding block {} on volume {}", (Object)cblock, (Object)this.volume.getBasePath());
        }
        catch (IOException e) {
            LOG.warn("I/O error while finding block {} on volume {}", (Object)cblock, (Object)this.volume.getBasePath());
        }
        if (block == null) {
            return -1L;
        }
        BlockSender blockSender = null;
        try {
            blockSender = new BlockSender(block, 0L, -1L, false, true, true, this.datanode, null, CachingStrategy.newDropBehind());
            this.throttler.setBandwidth(bytesPerSec);
            long bytesRead = blockSender.sendBlock(this.nullStream, null, this.throttler);
            this.resultHandler.handle(block, null);
            l = bytesRead;
        }
        catch (IOException e) {
            try {
                this.resultHandler.handle(block, e);
            }
            catch (Throwable throwable) {
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{blockSender});
                throw throwable;
            }
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{blockSender});
            return -1L;
        }
        IOUtils.cleanup(null, (Closeable[])new Closeable[]{blockSender});
        return l;
    }

    @VisibleForTesting
    static boolean calculateShouldScan(String storageId, long targetBytesPerSec, long scannedBytesSum, long startMinute, long curMinute) {
        long effectiveBytesPerSec;
        long runMinutes = curMinute - startMinute;
        if (runMinutes <= 0L) {
            effectiveBytesPerSec = scannedBytesSum;
        } else {
            if (runMinutes > 60L) {
                runMinutes = 60L;
            }
            effectiveBytesPerSec = scannedBytesSum / (60L * runMinutes);
        }
        boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec;
        LOG.trace("{}: calculateShouldScan: effectiveBytesPerSec = {}, and targetBytesPerSec = {}.  startMinute = {}, curMinute = {}, shouldScan = {}", new Object[]{storageId, effectiveBytesPerSec, targetBytesPerSec, startMinute, curMinute, shouldScan});
        return shouldScan;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long runLoop(ExtendedBlock suspectBlock) {
        long bytesScanned = -1L;
        boolean scanError = false;
        ExtendedBlock block = null;
        try {
            long monotonicMs = Time.monotonicNow();
            this.expireOldScannedBytesRecords(monotonicMs);
            if (!VolumeScanner.calculateShouldScan(this.volume.getStorageID(), this.conf.targetBytesPerSec, this.scannedBytesSum, this.startMinute, this.curMinute)) {
                long l = 30000L;
                return l;
            }
            if (suspectBlock != null) {
                block = suspectBlock;
            } else {
                if (this.curBlockIter == null || this.curBlockIter.atEnd()) {
                    long timeout = this.findNextUsableBlockIter();
                    if (timeout > 0L) {
                        LOG.trace("{}: no block pools are ready to scan yet.  Waiting {} ms.", (Object)this, (Object)timeout);
                        Statistics statistics = this.stats;
                        synchronized (statistics) {
                            this.stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
                        }
                        long l = timeout;
                        return l;
                    }
                    Statistics statistics = this.stats;
                    synchronized (statistics) {
                        ++this.stats.scansSinceRestart;
                        this.stats.blocksScannedInCurrentPeriod = 0L;
                        this.stats.nextBlockPoolScanStartMs = -1L;
                    }
                    long l = 0L;
                    return l;
                }
                try {
                    block = this.curBlockIter.nextBlock();
                }
                catch (IOException e) {
                    LOG.warn("{}: nextBlock error on {}", (Object)this, (Object)this.curBlockIter);
                    long l = 0L;
                    Statistics statistics = this.stats;
                    synchronized (statistics) {
                        this.stats.bytesScannedInPastHour = this.scannedBytesSum;
                        if (bytesScanned > 0L) {
                            ++this.stats.blocksScannedInCurrentPeriod;
                            ++this.stats.blocksScannedSinceRestart;
                        }
                        if (scanError) {
                            ++this.stats.scanErrorsSinceRestart;
                        }
                        if (block != null) {
                            this.stats.lastBlockScanned = block;
                        }
                        if (this.curBlockIter == null) {
                            this.stats.eof = true;
                            this.stats.blockPoolPeriodEndsMs = -1L;
                        } else {
                            this.stats.eof = this.curBlockIter.atEnd();
                            this.stats.blockPoolPeriodEndsMs = this.curBlockIter.getIterStartMs() + this.conf.scanPeriodMs;
                        }
                    }
                    return l;
                }
                if (block == null) {
                    LOG.info("{}: finished scanning block pool {}", (Object)this, (Object)this.curBlockIter.getBlockPoolId());
                    this.saveBlockIterator(this.curBlockIter);
                    long e = 0L;
                    return e;
                }
            }
            long saveDelta = monotonicMs - this.curBlockIter.getLastSavedMs();
            if (saveDelta >= this.conf.cursorSaveMs) {
                LOG.debug("{}: saving block iterator {} after {} ms.", new Object[]{this, this.curBlockIter, saveDelta});
                this.saveBlockIterator(this.curBlockIter);
            }
            if ((bytesScanned = this.scanBlock(block, this.conf.targetBytesPerSec)) >= 0L) {
                this.scannedBytesSum += bytesScanned;
                int n = (int)(this.curMinute % 60L);
                this.scannedBytes[n] = this.scannedBytes[n] + bytesScanned;
            } else {
                scanError = true;
            }
            long l = 0L;
            return l;
        }
        finally {
            Statistics statistics = this.stats;
            synchronized (statistics) {
                this.stats.bytesScannedInPastHour = this.scannedBytesSum;
                if (bytesScanned > 0L) {
                    ++this.stats.blocksScannedInCurrentPeriod;
                    ++this.stats.blocksScannedSinceRestart;
                }
                if (scanError) {
                    ++this.stats.scanErrorsSinceRestart;
                }
                if (block != null) {
                    this.stats.lastBlockScanned = block;
                }
                if (this.curBlockIter == null) {
                    this.stats.eof = true;
                    this.stats.blockPoolPeriodEndsMs = -1L;
                } else {
                    this.stats.eof = this.curBlockIter.atEnd();
                    this.stats.blockPoolPeriodEndsMs = this.curBlockIter.getIterStartMs() + this.conf.scanPeriodMs;
                }
            }
        }
    }

    private synchronized ExtendedBlock popNextSuspectBlock() {
        Iterator iter = this.suspectBlocks.iterator();
        if (!iter.hasNext()) {
            return null;
        }
        ExtendedBlock block = (ExtendedBlock)iter.next();
        iter.remove();
        return block;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.curMinute = this.startMinute = TimeUnit.MINUTES.convert(Time.monotonicNow(), TimeUnit.MILLISECONDS);
        try {
            LOG.trace("{}: thread starting.", (Object)this);
            this.resultHandler.setup(this);
            try {
                long timeout = 0L;
                while (true) {
                    ExtendedBlock suspectBlock = null;
                    VolumeScanner volumeScanner = this;
                    synchronized (volumeScanner) {
                        if (this.stopping) {
                            break;
                        }
                        if (timeout > 0L) {
                            this.wait(timeout);
                            if (this.stopping) {
                                break;
                            }
                        }
                        suspectBlock = this.popNextSuspectBlock();
                    }
                    timeout = this.runLoop(suspectBlock);
                }
            }
            catch (InterruptedException e) {
                LOG.trace("{} exiting because of InterruptedException.", (Object)this);
            }
            catch (Throwable e) {
                LOG.error("{} exiting because of exception ", (Object)this, (Object)e);
            }
            LOG.info("{} exiting.", (Object)this);
            for (FsVolumeSpi.BlockIterator iter : this.blockIters) {
                this.saveBlockIterator(iter);
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{iter});
            }
        }
        catch (Throwable throwable) {
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{this.ref});
            throw throwable;
        }
        IOUtils.cleanup(null, (Closeable[])new Closeable[]{this.ref});
    }

    @Override
    public String toString() {
        return "VolumeScanner(" + this.volume.getBasePath() + ", " + this.volume.getStorageID() + ")";
    }

    public synchronized void shutdown() {
        this.stopping = true;
        this.notify();
        this.interrupt();
    }

    public synchronized void markSuspectBlock(ExtendedBlock block) {
        if (this.stopping) {
            LOG.info("{}: Not scheduling suspect block {} for rescanning, because this volume scanner is stopping.", (Object)this, (Object)block);
            return;
        }
        Boolean recent = (Boolean)this.recentSuspectBlocks.getIfPresent((Object)block);
        if (recent != null) {
            LOG.info("{}: Not scheduling suspect block {} for rescanning, because we rescanned it recently.", (Object)this, (Object)block);
            return;
        }
        if (this.suspectBlocks.contains(block)) {
            LOG.info("{}: suspect block {} is already queued for rescanning.", (Object)this, (Object)block);
            return;
        }
        this.suspectBlocks.add(block);
        this.recentSuspectBlocks.put((Object)block, (Object)true);
        LOG.info("{}: Scheduling suspect block {} for rescanning.", (Object)this, (Object)block);
        this.notify();
    }

    public synchronized void enableBlockPoolId(String bpid) {
        for (FsVolumeSpi.BlockIterator iter : this.blockIters) {
            if (!iter.getBlockPoolId().equals(bpid)) continue;
            LOG.warn("{}: already enabled scanning on block pool {}", (Object)this, (Object)bpid);
            return;
        }
        FsVolumeSpi.BlockIterator iter = null;
        try {
            iter = this.volume.loadBlockIterator(bpid, BLOCK_ITERATOR_NAME);
            LOG.trace("{}: loaded block iterator for {}.", (Object)this, (Object)bpid);
        }
        catch (FileNotFoundException e) {
            LOG.debug("{}: failed to load block iterator: " + e.getMessage(), (Object)this);
        }
        catch (IOException e) {
            LOG.warn("{}: failed to load block iterator.", (Object)this, (Object)e);
        }
        if (iter == null) {
            iter = this.volume.newBlockIterator(bpid, BLOCK_ITERATOR_NAME);
            LOG.trace("{}: created new block iterator for {}.", (Object)this, (Object)bpid);
        }
        iter.setMaxStalenessMs(this.conf.maxStalenessMs);
        this.blockIters.add(iter);
        this.notify();
    }

    public synchronized void disableBlockPoolId(String bpid) {
        Iterator<FsVolumeSpi.BlockIterator> i = this.blockIters.iterator();
        while (i.hasNext()) {
            FsVolumeSpi.BlockIterator iter = i.next();
            if (!iter.getBlockPoolId().equals(bpid)) continue;
            LOG.trace("{}: disabling scanning on block pool {}", (Object)this, (Object)bpid);
            i.remove();
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{iter});
            if (this.curBlockIter == iter) {
                this.curBlockIter = null;
            }
            this.notify();
            return;
        }
        LOG.warn("{}: can't remove block pool {}, because it was never added.", (Object)this, (Object)bpid);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    Statistics getStatistics() {
        Statistics statistics = this.stats;
        synchronized (statistics) {
            return new Statistics(this.stats);
        }
    }

    static class ScanResultHandler {
        private VolumeScanner scanner;

        ScanResultHandler() {
        }

        public void setup(VolumeScanner scanner) {
            LOG.trace("Starting VolumeScanner {}", (Object)scanner.volume.getBasePath());
            this.scanner = scanner;
        }

        public void handle(ExtendedBlock block, IOException e) {
            FsVolumeSpi volume = this.scanner.volume;
            if (e == null) {
                LOG.trace("Successfully scanned {} on {}", (Object)block, (Object)volume.getBasePath());
                return;
            }
            if (!volume.getDataset().contains(block)) {
                LOG.debug("Volume {}: block {} is no longer in the dataset.", (Object)volume.getBasePath(), (Object)block);
                return;
            }
            if (e instanceof FileNotFoundException) {
                LOG.info("Volume {}: verification failed for {} because of FileNotFoundException.  This may be due to a race with write.", (Object)volume.getBasePath(), (Object)block);
                return;
            }
            LOG.warn("Reporting bad {} on {}", (Object)block, (Object)volume.getBasePath());
            try {
                this.scanner.datanode.reportBadBlocks(block);
            }
            catch (IOException ie) {
                LOG.warn("Cannot report bad " + block.getBlockId(), (Throwable)e);
            }
        }
    }

    static class Statistics {
        long bytesScannedInPastHour = 0L;
        long blocksScannedInCurrentPeriod = 0L;
        long blocksScannedSinceRestart = 0L;
        long scansSinceRestart = 0L;
        long scanErrorsSinceRestart = 0L;
        long nextBlockPoolScanStartMs = -1L;
        long blockPoolPeriodEndsMs = -1L;
        ExtendedBlock lastBlockScanned = null;
        boolean eof = false;

        Statistics() {
        }

        Statistics(Statistics other) {
            this.bytesScannedInPastHour = other.bytesScannedInPastHour;
            this.blocksScannedInCurrentPeriod = other.blocksScannedInCurrentPeriod;
            this.blocksScannedSinceRestart = other.blocksScannedSinceRestart;
            this.scansSinceRestart = other.scansSinceRestart;
            this.scanErrorsSinceRestart = other.scanErrorsSinceRestart;
            this.nextBlockPoolScanStartMs = other.nextBlockPoolScanStartMs;
            this.blockPoolPeriodEndsMs = other.blockPoolPeriodEndsMs;
            this.lastBlockScanned = other.lastBlockScanned;
            this.eof = other.eof;
        }

        public String toString() {
            return "Statistics{" + "bytesScannedInPastHour=" + this.bytesScannedInPastHour + ", blocksScannedInCurrentPeriod=" + this.blocksScannedInCurrentPeriod + ", blocksScannedSinceRestart=" + this.blocksScannedSinceRestart + ", scansSinceRestart=" + this.scansSinceRestart + ", scanErrorsSinceRestart=" + this.scanErrorsSinceRestart + ", nextBlockPoolScanStartMs=" + this.nextBlockPoolScanStartMs + ", blockPoolPeriodEndsMs=" + this.blockPoolPeriodEndsMs + ", lastBlockScanned=" + this.lastBlockScanned + ", eof=" + this.eof + "}";
        }
    }
}

