/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.client;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.util.EventReportUtil;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerSelector {
    private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES);
    private static final long PEER_REFRESH_PERIOD = 60000L;
    private final ReentrantLock peerRefreshLock = new ReentrantLock();
    private volatile List<PeerStatus> peerStatuses;
    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
    private volatile long peerRefreshTime = 0L;
    private final AtomicLong peerIndex = new AtomicLong(0L);
    private volatile PeerStatusCache peerStatusCache;
    private final File persistenceFile;
    private EventReporter eventReporter;
    private final PeerStatusProvider peerStatusProvider;
    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<PeerDescription, Long>();
    private SystemTime systemTime = new SystemTime();

    void setSystemTime(SystemTime systemTime) {
        logger.info("Replacing systemTime instance to {}.", (Object)systemTime);
        this.systemTime = systemTime;
    }

    public PeerSelector(PeerStatusProvider peerStatusProvider, File persistenceFile) {
        this.peerStatusProvider = peerStatusProvider;
        this.persistenceFile = persistenceFile;
        if (persistenceFile != null && persistenceFile.exists()) {
            try {
                Set<PeerStatus> recoveredStatuses = PeerSelector.recoverPersistedPeerStatuses(persistenceFile);
                this.peerStatusCache = new PeerStatusCache(recoveredStatuses, persistenceFile.lastModified());
            }
            catch (IOException ioe) {
                logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", (Object)persistenceFile, (Object)ioe);
            }
        } else {
            this.peerStatusCache = null;
        }
    }

    private void persistPeerStatuses(Set<PeerStatus> statuses) {
        if (this.persistenceFile == null) {
            return;
        }
        try (FileOutputStream fos = new FileOutputStream(this.persistenceFile);
             BufferedOutputStream out = new BufferedOutputStream(fos);){
            for (PeerStatus status : statuses) {
                PeerDescription description = status.getPeerDescription();
                String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
                ((OutputStream)out).write(line.getBytes(StandardCharsets.UTF_8));
            }
        }
        catch (IOException e) {
            EventReportUtil.error(logger, this.eventReporter, "Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString());
            logger.error("", (Throwable)e);
        }
    }

    private static Set<PeerStatus> recoverPersistedPeerStatuses(File file) throws IOException {
        if (!file.exists()) {
            return null;
        }
        HashSet<PeerStatus> statuses = new HashSet<PeerStatus>();
        try (FileInputStream fis = new FileInputStream(file);
             BufferedReader reader = new BufferedReader(new InputStreamReader(fis));){
            String line;
            while ((line = reader.readLine()) != null) {
                String[] splits = line.split(Pattern.quote(":"));
                if (splits.length != 3 && splits.length != 4) continue;
                String hostname = splits[0];
                int port = Integer.parseInt(splits[1]);
                boolean secure = Boolean.parseBoolean(splits[2]);
                boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
                statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
            }
        }
        return statuses;
    }

    List<PeerStatus> formulateDestinationList(Set<PeerStatus> statuses, TransferDirection direction) {
        int numDestinations = Math.max(128, statuses.size());
        HashMap<PeerStatus, Integer> entryCountMap = new HashMap<PeerStatus, Integer>();
        long totalFlowFileCount = 0L;
        for (PeerStatus peerStatus : statuses) {
            totalFlowFileCount += (long)peerStatus.getFlowFileCount();
        }
        int totalEntries = 0;
        for (PeerStatus nodeInfo : statuses) {
            int flowFileCount = nodeInfo.getFlowFileCount();
            double percentageOfFlowFiles = Math.min(0.8, (double)flowFileCount / (double)totalFlowFileCount);
            double relativeWeighting = direction == TransferDirection.SEND ? 1.0 - percentageOfFlowFiles : percentageOfFlowFiles;
            int entries = Math.max(1, (int)((double)numDestinations * relativeWeighting));
            entryCountMap.put(nodeInfo, Math.max(1, entries));
            totalEntries += entries;
        }
        ArrayList<PeerStatus> arrayList = new ArrayList<PeerStatus>(totalEntries);
        for (int i = 0; i < totalEntries; ++i) {
            arrayList.add(null);
        }
        for (Map.Entry entry : entryCountMap.entrySet()) {
            int numEntries;
            PeerStatus nodeInfo = (PeerStatus)entry.getKey();
            int skipIndex = numEntries = ((Integer)entry.getValue()).intValue();
            for (int i = 0; i < numEntries; ++i) {
                int index;
                PeerStatus status;
                int n = skipIndex * i;
                while (true) {
                    if ((status = (PeerStatus)arrayList.get(index = n % arrayList.size())) == null) break;
                    ++n;
                }
                status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
                arrayList.set(index, status);
            }
        }
        Collections.shuffle(arrayList, new Random(0L));
        StringBuilder distributionDescription = new StringBuilder();
        distributionDescription.append("New Weighted Distribution of Nodes:");
        for (Map.Entry entry : entryCountMap.entrySet()) {
            double percentage = (double)((Integer)entry.getValue()).intValue() * 100.0 / (double)arrayList.size();
            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
        }
        logger.info(distributionDescription.toString());
        return arrayList;
    }

    public void penalize(Peer peer, long penalizationMillis) {
        this.penalize(peer.getDescription(), penalizationMillis);
    }

    public void penalize(PeerDescription peerDescription, long penalizationMillis) {
        Long expiration = (Long)this.peerTimeoutExpirations.get(peerDescription);
        if (expiration == null) {
            expiration = 0L;
        }
        long newExpiration = Math.max(expiration, this.systemTime.currentTimeMillis() + penalizationMillis);
        this.peerTimeoutExpirations.put(peerDescription, newExpiration);
    }

    public boolean isPenalized(PeerStatus peerStatus) {
        Long expirationEnd = (Long)this.peerTimeoutExpirations.get(peerStatus.getPeerDescription());
        return expirationEnd != null && expirationEnd > this.systemTime.currentTimeMillis();
    }

    public void clear() {
        this.peerTimeoutExpirations.clear();
    }

    private boolean isPeerRefreshNeeded(List<PeerStatus> peerList) {
        return peerList == null || peerList.isEmpty() || this.systemTime.currentTimeMillis() > this.peerRefreshTime + 60000L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PeerStatus getNextPeerStatus(TransferDirection direction) {
        List<PeerStatus> peerList;
        block9: {
            peerList = this.peerStatuses;
            if (this.isPeerRefreshNeeded(peerList)) {
                this.peerRefreshLock.lock();
                try {
                    block10: {
                        peerList = this.peerStatuses;
                        if (!this.isPeerRefreshNeeded(peerList)) break block9;
                        try {
                            peerList = this.createPeerStatusList(direction);
                        }
                        catch (Exception e) {
                            String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
                            EventReportUtil.warn(logger, this.eventReporter, message, new Object[0]);
                            if (!logger.isDebugEnabled()) break block10;
                            logger.warn("", (Throwable)e);
                        }
                    }
                    this.peerStatuses = peerList;
                    this.peerRefreshTime = this.systemTime.currentTimeMillis();
                }
                finally {
                    this.peerRefreshLock.unlock();
                }
            }
        }
        if (peerList == null || peerList.isEmpty()) {
            return null;
        }
        for (int i = 0; i < peerList.size(); ++i) {
            long idx = this.peerIndex.getAndIncrement();
            int listIndex = (int)(idx % (long)peerList.size());
            PeerStatus peerStatus = peerList.get(listIndex);
            if (!this.isPenalized(peerStatus)) {
                return peerStatus;
            }
            logger.debug("{} {} is penalized; will not communicate with this peer", (Object)this, (Object)peerStatus);
        }
        logger.debug("{} All peers appear to be penalized; returning null", (Object)this);
        return null;
    }

    private List<PeerStatus> createPeerStatusList(TransferDirection direction) throws IOException {
        Set<PeerStatus> statuses = this.getPeerStatuses();
        if (statuses == null) {
            this.refreshPeers();
            statuses = this.getPeerStatuses();
            if (statuses == null) {
                logger.debug("{} found no peers to connect to", (Object)this);
                return Collections.emptyList();
            }
        }
        return this.formulateDestinationList(statuses, direction);
    }

    private Set<PeerStatus> getPeerStatuses() {
        PeerStatusCache cache = this.peerStatusCache;
        if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
            return null;
        }
        if (cache.getTimestamp() + PEER_CACHE_MILLIS < this.systemTime.currentTimeMillis()) {
            HashSet<PeerStatus> equalizedSet = new HashSet<PeerStatus>(cache.getStatuses().size());
            for (PeerStatus status : cache.getStatuses()) {
                PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
                equalizedSet.add(equalizedStatus);
            }
            return equalizedSet;
        }
        return cache.getStatuses();
    }

    public void refreshPeers() {
        block3: {
            PeerStatusCache existingCache = this.peerStatusCache;
            if (existingCache != null && existingCache.getTimestamp() + PEER_CACHE_MILLIS > this.systemTime.currentTimeMillis()) {
                return;
            }
            try {
                Set<PeerStatus> statuses = this.fetchRemotePeerStatuses();
                this.persistPeerStatuses(statuses);
                this.peerStatusCache = new PeerStatusCache(statuses);
                logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", (Object)this, (Object)statuses.size());
            }
            catch (Exception e) {
                EventReportUtil.warn(logger, this.eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());
                if (!logger.isDebugEnabled()) break block3;
                logger.debug("", (Throwable)e);
            }
        }
    }

    public void setEventReporter(EventReporter eventReporter) {
        this.eventReporter = eventReporter;
    }

    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
        HashSet<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<PeerDescription>();
        Set<PeerStatus> lastFetched = this.lastFetchedQueryablePeers;
        if (lastFetched != null && !lastFetched.isEmpty()) {
            lastFetched.stream().map(peer -> peer.getPeerDescription()).forEach(desc -> peersToRequestClusterInfoFrom.add((PeerDescription)desc));
        }
        peersToRequestClusterInfoFrom.add(this.peerStatusProvider.getBootstrapPeerDescription());
        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
        Exception lastFailure = null;
        for (PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
            try {
                Set<PeerStatus> statuses = this.peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
                this.lastFetchedQueryablePeers = statuses.stream().filter(p -> p.isQueryForPeers()).collect(Collectors.toSet());
                return statuses;
            }
            catch (Exception e) {
                logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster, due to {}", new Object[]{peerDescription.getHostname(), peerDescription.getPort(), e.toString()});
                lastFailure = e;
            }
        }
        IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
        if (lastFailure != null) {
            ioe.addSuppressed(lastFailure);
        }
        throw ioe;
    }

    static class SystemTime {
        SystemTime() {
        }

        long currentTimeMillis() {
            return System.currentTimeMillis();
        }
    }
}

