/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.discovery;

import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.schedulerx.common.domain.JSONResult;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.HttpResponse;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.JsonNode;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.Unirest;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.discovery.ServerDiscovery;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DefaultServerDiscovery
implements ServerDiscovery {
    private static final Logger LOGGER = LogFactory.getLogger(DefaultServerDiscovery.class);
    private static final String ACTIVE_SERVER_QUERY_PATH = "/worker/v1/appgroup/getLeaderAddr";
    private static final String QUERY_THREAD_NAME_FORMAT = "activeServerQueryThread-%d";
    private static final String ALL_USEFUL_SERVER_LIST_QUERY_PATH = "/app/getAppGroupAllServerAddrList.json";
    private ScheduledExecutorService scheduledExecutorService;
    private volatile String activeServerAddr;
    private volatile ActorSelection instanceStatusRouter;
    private volatile ActorSelection mapMasterRouter;
    private volatile ActorSelection taskStatusRouter;
    private volatile ActorSelection heartbeatActor;
    private volatile List<ActorSelection> standbyServerHeatbeatActors;
    private ActorSystem actorSystem = SchedulerxWorker.actorSystem;

    @Override
    public void start(final String groupId) throws Exception {
        final String consoleDomain = ConfigUtil.getWorkerConfig().getString("domainName");
        final String namespace = ConfigUtil.getWorkerConfig().getString("schedulerx.namespace");
        final String namespaceSource = ConfigUtil.getWorkerConfig().getString("schedulerx.namespace.source");
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(QUERY_THREAD_NAME_FORMAT).build(), new ThreadPoolExecutor.DiscardPolicy());
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    List allServerList;
                    String activeServer = DefaultServerDiscovery.this.queryActiveServer(consoleDomain, groupId, namespace, namespaceSource);
                    if (StringUtils.isNotBlank(activeServer) && !activeServer.equalsIgnoreCase(DefaultServerDiscovery.this.activeServerAddr)) {
                        LOGGER.info("activeServerAddr={} change to {}", DefaultServerDiscovery.this.activeServerAddr, activeServer);
                        DefaultServerDiscovery.this.activeServerAddr = activeServer;
                        DefaultServerDiscovery.this.heartbeatActor = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getActiveHeartbeatAkkaPath(DefaultServerDiscovery.this.activeServerAddr));
                        DefaultServerDiscovery.this.instanceStatusRouter = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getServerInstanceStatusRouterAkkaPath());
                        DefaultServerDiscovery.this.mapMasterRouter = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getServerMapMasterRouterAkkaPath());
                        DefaultServerDiscovery.this.taskStatusRouter = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getServerTaskStatusRouterAkkaPath());
                    }
                    if (CollectionUtils.isNotEmpty(allServerList = DefaultServerDiscovery.this.queryAllServerList(consoleDomain, groupId, namespace, namespaceSource))) {
                        if (allServerList.size() > 1) {
                            ArrayList<String> standbyServerList = Lists.newArrayList();
                            for (String server : allServerList) {
                                if (server.equalsIgnoreCase(DefaultServerDiscovery.this.activeServerAddr)) continue;
                                standbyServerList.add(server);
                            }
                            ArrayList<ActorSelection> tmp = Lists.newArrayList();
                            for (String standbyServer : standbyServerList) {
                                tmp.add(DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getActiveHeartbeatAkkaPath(standbyServer)));
                            }
                            DefaultServerDiscovery.this.standbyServerHeatbeatActors = tmp;
                        }
                    } else {
                        LOGGER.warn("scheduled query server list is empty, groupId={}.", groupId);
                    }
                }
                catch (Throwable t) {
                    LOGGER.error("scheduled query active server error!", t);
                }
            }
        }, 0L, 5L, TimeUnit.SECONDS);
    }

    @Override
    public String getActiveServerAddr() {
        return this.activeServerAddr;
    }

    private String getServerInstanceStatusRouterAkkaPath() {
        return "akka.tcp://server@" + this.activeServerAddr + "/user/instance_status_router";
    }

    private String getServerMapMasterRouterAkkaPath() {
        return "akka.tcp://server@" + this.activeServerAddr + "/user/map_master_router";
    }

    private String getServerTaskStatusRouterAkkaPath() {
        return "akka.tcp://server@" + this.activeServerAddr + "/user/task_status_router";
    }

    private String getActiveHeartbeatAkkaPath(String serverAddr) {
        return "akka.tcp://server@" + serverAddr + "/user/heartbeat";
    }

    @Override
    public ActorSelection getActiveHeartBeatActor() {
        return this.heartbeatActor;
    }

    @Override
    public void stop() throws Exception {
        this.scheduledExecutorService.shutdown();
    }

    private String queryActiveServer(String domain, String groupId, String namespace, String namespaceSource) {
        String activeServerQueryUrl;
        String res = null;
        if (namespace != null) {
            activeServerQueryUrl = "http://" + domain + ACTIVE_SERVER_QUERY_PATH + "?groupId=" + groupId + "&namespace=" + namespace;
            if (StringUtils.isNotBlank(namespaceSource)) {
                activeServerQueryUrl = activeServerQueryUrl + "&namespaceSource=" + namespaceSource;
            }
        } else {
            activeServerQueryUrl = "http://" + domain + ACTIVE_SERVER_QUERY_PATH + "?groupId=" + groupId;
        }
        try {
            HttpResponse<JsonNode> jsonResponse = Unirest.get(activeServerQueryUrl).asJson();
            JSONResult jsonResult = (JSONResult)JSON.parseObject((String)jsonResponse.getBody().toString(), JSONResult.class);
            if (jsonResult != null) {
                res = (String)jsonResult.getData();
            }
        }
        catch (Throwable e) {
            LOGGER.error("query active server error, url=" + activeServerQueryUrl, e);
        }
        return res;
    }

    private List<String> queryAllServerList(String domain, String groupId, String namespace, String namespaceSource) {
        ArrayList<String> res;
        block7: {
            String url;
            res = new ArrayList<String>(3);
            if (namespace != null) {
                url = "http://" + domain + ALL_USEFUL_SERVER_LIST_QUERY_PATH + "?groupId=" + groupId + "&namespace=" + namespace;
                if (StringUtils.isNotBlank(namespaceSource)) {
                    url = url + "&namespaceSource=" + namespaceSource;
                }
            } else {
                url = "http://" + domain + ALL_USEFUL_SERVER_LIST_QUERY_PATH + "?groupId=" + groupId;
            }
            try {
                HttpResponse<JsonNode> jsonResponse = Unirest.get(url).asJson();
                JSONResult jsonResult = (JSONResult)JSON.parseObject((String)jsonResponse.getBody().toString(), JSONResult.class);
                if (jsonResult != null && jsonResult.getData() != null && jsonResult.getData() instanceof JSONArray) {
                    JSONArray array = (JSONArray)jsonResult.getData();
                    for (Object o : array) {
                        res.add((String)o);
                    }
                }
            }
            catch (Throwable e) {
                LOGGER.warn("query server list error", e);
                String active2 = this.queryActiveServer(domain, groupId, namespace, namespaceSource);
                if (!StringUtils.isNotBlank(active2)) break block7;
                res.add(active2);
            }
        }
        return res;
    }

    @Override
    public ActorSelection getInstanceStatusRouter() {
        return this.instanceStatusRouter;
    }

    @Override
    public ActorSelection getMapMasterRouter() {
        return this.mapMasterRouter;
    }

    @Override
    public ActorSelection getTaskStatusRouter() {
        return this.taskStatusRouter;
    }

    @Override
    public List<ActorSelection> getStandbyServerHeatbeatActors() {
        return this.standbyServerHeatbeatActors;
    }
}

