/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.client.http;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.protocol.http.HttpClientTransaction;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.dto.remote.PeerDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpClient
extends AbstractSiteToSiteClient
implements PeerStatusProvider {
    private static final Logger logger = LoggerFactory.getLogger(HttpClient.class);
    private final ScheduledExecutorService taskExecutor;
    private final PeerSelector peerSelector;
    private final Set<HttpClientTransaction> activeTransactions = Collections.synchronizedSet(new HashSet());

    public HttpClient(SiteToSiteClientConfig config) {
        super(config);
        this.peerSelector = new PeerSelector(this, config.getPeerPersistenceFile());
        this.peerSelector.setEventReporter(config.getEventReporter());
        this.taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory(){
            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = this.defaultFactory.newThread(r);
                thread.setName("Http Site-to-Site PeerSelector");
                thread.setDaemon(true);
                return thread;
            }
        });
        this.taskExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                HttpClient.this.peerSelector.refreshPeers();
            }
        }, 0L, 5L, TimeUnit.SECONDS);
    }

    @Override
    public PeerDescription getBootstrapPeerDescription() throws IOException {
        if (this.siteInfoProvider.getSiteToSiteHttpPort() == null) {
            throw new IOException("Remote instance of NiFi is not configured to allow HTTP site-to-site communications");
        }
        URI clusterUrl = this.siteInfoProvider.getActiveClusterUrl();
        return new PeerDescription(clusterUrl.getHost(), this.siteInfoProvider.getSiteToSiteHttpPort(), this.siteInfoProvider.isSecure());
    }

    @Override
    public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException {
        try (SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(this.config.getSslContext(), this.config.getHttpProxy(), this.config.getEventReporter());){
            String scheme = peerDescription.isSecure() ? "https" : "http";
            apiClient.setBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());
            int timeoutMillis = (int)this.config.getTimeout(TimeUnit.MILLISECONDS);
            apiClient.setConnectTimeoutMillis(timeoutMillis);
            apiClient.setReadTimeoutMillis(timeoutMillis);
            apiClient.setCacheExpirationMillis(this.config.getCacheExpiration(TimeUnit.MILLISECONDS));
            apiClient.setLocalAddress(this.config.getLocalAddress());
            Collection<PeerDTO> peers = apiClient.getPeers();
            if (peers == null || peers.size() == 0) {
                throw new IOException("Couldn't get any peer to communicate with. " + apiClient.getBaseUrl() + " returned zero peers.");
            }
            Set<PeerStatus> set = peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true)).collect(Collectors.toSet());
            return set;
        }
    }

    @Override
    public Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
        PeerStatus peerStatus;
        int timeoutMillis = (int)this.config.getTimeout(TimeUnit.MILLISECONDS);
        while ((peerStatus = this.peerSelector.getNextPeerStatus(direction)) != null) {
            String transactionUrl;
            logger.debug("peerStatus={}", (Object)peerStatus);
            HttpCommunicationsSession commSession = new HttpCommunicationsSession();
            String nodeApiUrl = this.resolveNodeApiUrl(peerStatus.getPeerDescription());
            StringBuilder clusterUrls = new StringBuilder();
            this.config.getUrls().forEach(url -> {
                if (clusterUrls.length() > 0) {
                    clusterUrls.append(",");
                    clusterUrls.append((String)url);
                }
            });
            Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrls.toString());
            int penaltyMillis = (int)this.config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
            String portId = this.config.getPortIdentifier();
            if (StringUtils.isEmpty((CharSequence)portId) && StringUtils.isEmpty((CharSequence)(portId = this.siteInfoProvider.getPortIdentifier(this.config.getPortName(), direction)))) {
                peer.close();
                throw new IOException("Failed to determine the identifier of port " + this.config.getPortName());
            }
            SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(this.config.getSslContext(), this.config.getHttpProxy(), this.config.getEventReporter());
            apiClient.setBaseUrl(peer.getUrl());
            apiClient.setConnectTimeoutMillis(timeoutMillis);
            apiClient.setReadTimeoutMillis(timeoutMillis);
            apiClient.setCacheExpirationMillis(this.config.getCacheExpiration(TimeUnit.MILLISECONDS));
            apiClient.setLocalAddress(this.config.getLocalAddress());
            apiClient.setCompress(this.config.isUseCompression());
            apiClient.setRequestExpirationMillis(this.config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS));
            apiClient.setBatchCount(this.config.getPreferredBatchCount());
            apiClient.setBatchSize(this.config.getPreferredBatchSize());
            apiClient.setBatchDurationMillis(this.config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
            try {
                transactionUrl = apiClient.initiateTransaction(direction, portId);
                commSession.setUserDn(apiClient.getTrustedPeerDn());
            }
            catch (Exception e) {
                apiClient.close();
                logger.warn("Penalizing a peer {} due to {}", (Object)peer, (Object)e.toString());
                this.peerSelector.penalize(peer, (long)penaltyMillis);
                if (e instanceof UnknownPortException || e instanceof PortNotRunningException || e instanceof HandshakeException) {
                    throw e;
                }
                logger.debug("Continue trying other peers...");
                continue;
            }
            Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
            HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction, this.config.isUseCompression(), portId, penaltyMillis, this.config.getEventReporter()){

                @Override
                protected void close() throws IOException {
                    try {
                        super.close();
                    }
                    finally {
                        HttpClient.this.activeTransactions.remove(this);
                    }
                }
            };
            try {
                transaction.initialize(apiClient, transactionUrl);
            }
            catch (Exception e) {
                transaction.error();
                throw e;
            }
            this.activeTransactions.add(transaction);
            return transaction;
        }
        logger.info("Couldn't find a valid peer to communicate with.");
        return null;
    }

    private String resolveNodeApiUrl(PeerDescription description) {
        return (description.isSecure() ? "https" : "http") + "://" + description.getHostname() + ":" + description.getPort() + "/nifi-api";
    }

    @Override
    public boolean isSecure() throws IOException {
        return this.siteInfoProvider.isWebInterfaceSecure();
    }

    @Override
    public void close() throws IOException {
        this.taskExecutor.shutdown();
        this.peerSelector.clear();
        for (HttpClientTransaction transaction : this.activeTransactions) {
            transaction.getCommunicant().getCommunicationsSession().interrupt();
        }
    }
}

