/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.jraft.rhea.client.pd;

import com.alipay.remoting.InvokeCallback;
import com.alipay.remoting.InvokeContext;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.sofa.jraft.Lifecycle;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rhea.StoreEngine;
import com.alipay.sofa.jraft.rhea.client.pd.AbstractPlacementDriverClient;
import com.alipay.sofa.jraft.rhea.client.pd.InstructionProcessor;
import com.alipay.sofa.jraft.rhea.client.pd.PlacementDriverClient;
import com.alipay.sofa.jraft.rhea.client.pd.StatsCollector;
import com.alipay.sofa.jraft.rhea.cmd.pd.BaseRequest;
import com.alipay.sofa.jraft.rhea.cmd.pd.BaseResponse;
import com.alipay.sofa.jraft.rhea.cmd.pd.RegionHeartbeatRequest;
import com.alipay.sofa.jraft.rhea.cmd.pd.StoreHeartbeatRequest;
import com.alipay.sofa.jraft.rhea.errors.ErrorsHelper;
import com.alipay.sofa.jraft.rhea.metadata.Instruction;
import com.alipay.sofa.jraft.rhea.metadata.Region;
import com.alipay.sofa.jraft.rhea.metadata.RegionStats;
import com.alipay.sofa.jraft.rhea.metadata.StoreStats;
import com.alipay.sofa.jraft.rhea.metadata.TimeInterval;
import com.alipay.sofa.jraft.rhea.options.HeartbeatOptions;
import com.alipay.sofa.jraft.rhea.rpc.ExtSerializerSupports;
import com.alipay.sofa.jraft.rhea.storage.BaseKVStoreClosure;
import com.alipay.sofa.jraft.rhea.util.Lists;
import com.alipay.sofa.jraft.rhea.util.Pair;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.alipay.sofa.jraft.rhea.util.concurrent.DiscardOldPolicyWithReport;
import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.alipay.sofa.jraft.util.timer.HashedWheelTimer;
import com.alipay.sofa.jraft.util.timer.Timeout;
import com.alipay.sofa.jraft.util.timer.TimerTask;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatSender
implements Lifecycle<HeartbeatOptions> {
    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatSender.class);
    private final StoreEngine storeEngine;
    private final PlacementDriverClient pdClient;
    private final RpcClient rpcClient;
    private StatsCollector statsCollector;
    private InstructionProcessor instructionProcessor;
    private int heartbeatRpcTimeoutMillis;
    private ThreadPoolExecutor heartbeatRpcCallbackExecutor;
    private HashedWheelTimer heartbeatTimer;
    private boolean started;

    public HeartbeatSender(StoreEngine storeEngine) {
        this.storeEngine = storeEngine;
        this.pdClient = storeEngine.getPlacementDriverClient();
        this.rpcClient = ((AbstractPlacementDriverClient)this.pdClient).getRpcClient();
    }

    public synchronized boolean init(HeartbeatOptions opts) {
        if (this.started) {
            LOG.info("[HeartbeatSender] already started.");
            return true;
        }
        this.statsCollector = new StatsCollector(this.storeEngine);
        this.instructionProcessor = new InstructionProcessor(this.storeEngine);
        this.heartbeatTimer = new HashedWheelTimer((ThreadFactory)new NamedThreadFactory("heartbeat-timer", true), 50L, TimeUnit.MILLISECONDS, 4096);
        this.heartbeatRpcTimeoutMillis = opts.getHeartbeatRpcTimeoutMillis();
        if (this.heartbeatRpcTimeoutMillis <= 0) {
            throw new IllegalArgumentException("Heartbeat rpc timeout millis must > 0, " + this.heartbeatRpcTimeoutMillis);
        }
        String name = "rheakv-heartbeat-callback";
        this.heartbeatRpcCallbackExecutor = ThreadPoolUtil.newBuilder().poolName("rheakv-heartbeat-callback").enableMetric(Boolean.valueOf(true)).coreThreads(Integer.valueOf(4)).maximumThreads(Integer.valueOf(4)).keepAliveSeconds(Long.valueOf(120L)).workQueue(new ArrayBlockingQueue(1024)).threadFactory((ThreadFactory)new NamedThreadFactory("rheakv-heartbeat-callback", true)).rejectedHandler((RejectedExecutionHandler)new DiscardOldPolicyWithReport("rheakv-heartbeat-callback")).build();
        long storeHeartbeatIntervalSeconds = opts.getStoreHeartbeatIntervalSeconds();
        long regionHeartbeatIntervalSeconds = opts.getRegionHeartbeatIntervalSeconds();
        if (storeHeartbeatIntervalSeconds <= 0L) {
            throw new IllegalArgumentException("Store heartbeat interval seconds must > 0, " + storeHeartbeatIntervalSeconds);
        }
        if (regionHeartbeatIntervalSeconds <= 0L) {
            throw new IllegalArgumentException("Region heartbeat interval seconds must > 0, " + regionHeartbeatIntervalSeconds);
        }
        long now = System.currentTimeMillis();
        StoreHeartbeatTask storeHeartbeatTask = new StoreHeartbeatTask(storeHeartbeatIntervalSeconds, now, false);
        RegionHeartbeatTask regionHeartbeatTask = new RegionHeartbeatTask(regionHeartbeatIntervalSeconds, now, false);
        this.heartbeatTimer.newTimeout((TimerTask)storeHeartbeatTask, storeHeartbeatTask.getNextDelay(), TimeUnit.SECONDS);
        this.heartbeatTimer.newTimeout((TimerTask)regionHeartbeatTask, regionHeartbeatTask.getNextDelay(), TimeUnit.SECONDS);
        LOG.info("[HeartbeatSender] start successfully, options: {}.", (Object)opts);
        this.started = true;
        return true;
    }

    public synchronized void shutdown() {
        ExecutorServiceHelper.shutdownAndAwaitTermination((ExecutorService)this.heartbeatRpcCallbackExecutor);
        if (this.heartbeatTimer != null) {
            this.heartbeatTimer.stop();
        }
    }

    private void sendStoreHeartbeat(final long nextDelay, boolean forceRefreshLeader, long lastTime) {
        final long now = System.currentTimeMillis();
        StoreHeartbeatRequest request = new StoreHeartbeatRequest();
        request.setClusterId(this.storeEngine.getClusterId());
        TimeInterval timeInterval = new TimeInterval(lastTime, now);
        StoreStats stats = this.statsCollector.collectStoreStats(timeInterval);
        request.setStats(stats);
        HeartbeatClosure<Object> closure = new HeartbeatClosure<Object>(){

            public void run(Status status) {
                boolean forceRefresh = !status.isOk() && ErrorsHelper.isInvalidPeer(this.getError());
                StoreHeartbeatTask nexTask = new StoreHeartbeatTask(nextDelay, now, forceRefresh);
                HeartbeatSender.this.heartbeatTimer.newTimeout((TimerTask)nexTask, nexTask.getNextDelay(), TimeUnit.SECONDS);
            }
        };
        Endpoint endpoint = this.pdClient.getPdLeader(forceRefreshLeader, this.heartbeatRpcTimeoutMillis);
        this.callAsyncWithRpc(endpoint, request, closure);
    }

    private void sendRegionHeartbeat(final long nextDelay, long lastTime, boolean forceRefreshLeader) {
        final long now = System.currentTimeMillis();
        RegionHeartbeatRequest request = new RegionHeartbeatRequest();
        request.setClusterId(this.storeEngine.getClusterId());
        request.setStoreId(this.storeEngine.getStoreId());
        request.setLeastKeysOnSplit(this.storeEngine.getStoreOpts().getLeastKeysOnSplit());
        List<Long> regionIdList = this.storeEngine.getLeaderRegionIds();
        if (regionIdList.isEmpty()) {
            RegionHeartbeatTask nextTask = new RegionHeartbeatTask(nextDelay, now, false);
            this.heartbeatTimer.newTimeout((TimerTask)nextTask, nextTask.getNextDelay(), TimeUnit.SECONDS);
            if (LOG.isInfoEnabled()) {
                LOG.info("So sad, there is no even a region leader on [clusterId:{}, storeId: {}, endpoint:{}].", new Object[]{this.storeEngine.getClusterId(), this.storeEngine.getStoreId(), this.storeEngine.getSelfEndpoint()});
            }
            return;
        }
        ArrayList<Pair<Region, RegionStats>> regionStatsList = Lists.newArrayListWithCapacity(regionIdList.size());
        TimeInterval timeInterval = new TimeInterval(lastTime, now);
        for (Long regionId : regionIdList) {
            Region region = this.pdClient.getRegionById(regionId);
            RegionStats stats = this.statsCollector.collectRegionStats(region, timeInterval);
            if (stats == null) continue;
            regionStatsList.add(Pair.of(region, stats));
        }
        request.setRegionStatsList(regionStatsList);
        HeartbeatClosure<List<Instruction>> closure = new HeartbeatClosure<List<Instruction>>(){

            public void run(Status status) {
                List instructions;
                boolean isOk = status.isOk();
                if (isOk && (instructions = (List)this.getResult()) != null && !instructions.isEmpty()) {
                    HeartbeatSender.this.instructionProcessor.process(instructions);
                }
                boolean forceRefresh = !isOk && ErrorsHelper.isInvalidPeer(this.getError());
                RegionHeartbeatTask nextTask = new RegionHeartbeatTask(nextDelay, now, forceRefresh);
                HeartbeatSender.this.heartbeatTimer.newTimeout((TimerTask)nextTask, nextTask.getNextDelay(), TimeUnit.SECONDS);
            }
        };
        Endpoint endpoint = this.pdClient.getPdLeader(forceRefreshLeader, this.heartbeatRpcTimeoutMillis);
        this.callAsyncWithRpc(endpoint, request, closure);
    }

    private <V> void callAsyncWithRpc(Endpoint endpoint, BaseRequest request, final HeartbeatClosure<V> closure) {
        final String address = endpoint.toString();
        InvokeContext invokeCtx = ExtSerializerSupports.getInvokeContext();
        InvokeCallback invokeCallback = new InvokeCallback(){

            public void onResponse(Object result) {
                BaseResponse response = (BaseResponse)result;
                if (response.isSuccess()) {
                    closure.setResult(response.getValue());
                    closure.run(Status.OK());
                } else {
                    closure.setError(response.getError());
                    closure.run(new Status(-1, "RPC failed with address: %s, response: %s", new Object[]{address, response}));
                }
            }

            public void onException(Throwable t) {
                closure.run(new Status(-1, t.getMessage()));
            }

            public Executor getExecutor() {
                return HeartbeatSender.this.heartbeatRpcCallbackExecutor;
            }
        };
        try {
            this.rpcClient.invokeWithCallback(address, (Object)request, invokeCtx, invokeCallback, this.heartbeatRpcTimeoutMillis);
        }
        catch (Throwable t) {
            closure.run(new Status(-1, t.getMessage()));
        }
    }

    private final class RegionHeartbeatTask
    implements TimerTask {
        private final long nextDelay;
        private final long lastTime;
        private final boolean forceRefreshLeader;

        private RegionHeartbeatTask(long nextDelay, long lastTime, boolean forceRefreshLeader) {
            this.nextDelay = nextDelay;
            this.lastTime = lastTime;
            this.forceRefreshLeader = forceRefreshLeader;
        }

        public void run(Timeout timeout) throws Exception {
            try {
                HeartbeatSender.this.sendRegionHeartbeat(this.nextDelay, this.lastTime, this.forceRefreshLeader);
            }
            catch (Throwable t) {
                LOG.error("Caught a error on sending [RegionHeartbeat]: {}.", (Object)StackTraceUtil.stackTrace(t));
            }
        }

        public long getNextDelay() {
            return this.nextDelay;
        }
    }

    private final class StoreHeartbeatTask
    implements TimerTask {
        private final long nextDelay;
        private final long lastTime;
        private final boolean forceRefreshLeader;

        private StoreHeartbeatTask(long nextDelay, long lastTime, boolean forceRefreshLeader) {
            this.nextDelay = nextDelay;
            this.lastTime = lastTime;
            this.forceRefreshLeader = forceRefreshLeader;
        }

        public void run(Timeout timeout) throws Exception {
            try {
                HeartbeatSender.this.sendStoreHeartbeat(this.nextDelay, this.forceRefreshLeader, this.lastTime);
            }
            catch (Throwable t) {
                LOG.error("Caught a error on sending [StoreHeartbeat]: {}.", (Object)StackTraceUtil.stackTrace(t));
            }
        }

        public long getNextDelay() {
            return this.nextDelay;
        }
    }

    private static abstract class HeartbeatClosure<V>
    extends BaseKVStoreClosure {
        private volatile V result;

        private HeartbeatClosure() {
        }

        public V getResult() {
            return this.result;
        }

        public void setResult(V result) {
            this.result = result;
        }
    }
}

