/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.remote.AssociationErrorEvent;
import akka.remote.ThisActorSystemQuarantinedEvent;
import akka.routing.RoundRobinPool;
import com.alibaba.schedulerx.common.monitor.MetricsCollector;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.IpUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.common.util.ReflectionUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.com.google.protobuf.ProtocolMessageEnum;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.HttpResponse;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.JsonNode;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.Unirest;
import com.alibaba.schedulerx.shade.com.taobao.spas.sdk.client.SpasSdkClientFacade;
import com.alibaba.schedulerx.shade.com.taobao.spas.sdk.client.identity.Credentials;
import com.alibaba.schedulerx.shade.javassist.compiler.JvstCodeGen;
import com.alibaba.schedulerx.shade.org.apache.commons.configuration.Configuration;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.shade.org.jboss.netty.channel.socket.nio.Boss;
import com.alibaba.schedulerx.worker.actor.ContainerActor;
import com.alibaba.schedulerx.worker.actor.GuarantineRecoverActor;
import com.alibaba.schedulerx.worker.actor.JobInstanceActor;
import com.alibaba.schedulerx.worker.actor.LogActor;
import com.alibaba.schedulerx.worker.actor.TaskActor;
import com.alibaba.schedulerx.worker.actor.WorkerHeartbeatActor;
import com.alibaba.schedulerx.worker.discovery.ArmoryResult;
import com.alibaba.schedulerx.worker.discovery.DefaultGroupDiscovery;
import com.alibaba.schedulerx.worker.discovery.GroupDiscovery;
import com.alibaba.schedulerx.worker.discovery.ServerDiscovery;
import com.alibaba.schedulerx.worker.discovery.ServerDiscoveryFactory;
import com.alibaba.schedulerx.worker.exception.DomainInvalidException;
import com.alibaba.schedulerx.worker.exception.DomainNotFoundException;
import com.alibaba.schedulerx.worker.exception.NamespaceNotFoundException;
import com.alibaba.schedulerx.worker.ha.AtLeastOnceDeliveryRoutingActor;
import com.alibaba.schedulerx.worker.ha.HealthTimeHolder;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCleaner;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import com.alibaba.schedulerx.worker.master.persistence.H2FilePersistence;
import com.alibaba.schedulerx.worker.security.Authenticator;
import com.alibaba.schedulerx.worker.security.DefaultAuthenticator;
import com.alibaba.schedulerx.worker.timer.AbstractTimerTask;
import com.alibaba.schedulerx.worker.util.ConsoleUtil;
import com.alibaba.schedulerx.worker.util.DiamondUtil;
import com.alibaba.schedulerx.worker.util.SpringContext;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;

public class SchedulerxWorker
implements ApplicationContextAware,
InitializingBean,
ApplicationListener<ApplicationContextEvent> {
    private static final Logger LOGGER = LogFactory.getLogger(SchedulerxWorker.class);
    public static ActorSystem actorSystem = null;
    public static ActorRef AtLeastDeliveryRoutingActor = null;
    public static boolean INITED = false;
    public static ClassLoader CUSTOMER_CLASS_LOADER = null;
    public static String WORKER_ADDR = null;
    public static Map<String, Long> groupIdMap = Maps.newHashMap();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void init() throws Exception {
        if (INITED) {
            LOGGER.warn("schedulerx worker has been inited, skip this init");
            return;
        }
        LOGGER.info("Schedulerx Worker starting...");
        this.printMvnDenpendency();
        Configuration conf = ConfigUtil.getWorkerConfig();
        try {
            this.initMetaInfoFromSystem(conf);
            String domainName = this.initConsoleDomain();
            String host = conf.getString("hostname", SchedulerxWorker.getLocalHost());
            if (SchedulerxWorker.isolateMachine(host)) {
                return;
            }
            if (StringUtils.isBlank(domainName)) {
                throw new DomainNotFoundException("Not found domainName.");
            }
            String groupDiscoveryClassName = conf.getString("worker.group.discovery", DefaultGroupDiscovery.class.getName());
            GroupDiscovery groupDiscovery = (GroupDiscovery)ReflectionUtil.getInstanceByClassName(groupDiscoveryClassName, CUSTOMER_CLASS_LOADER);
            List<String> groupIdList = groupDiscovery.getGroupIdSet(conf);
            if (groupIdList.isEmpty()) {
                throw new IOException("please set groupId");
            }
            String namespace = this.initNamespace();
            this.initMetaInfoFromConsole(namespace, conf.getString("schedulerx.namespace.source"), groupIdList);
            this.checkParameters(conf, namespace);
            for (String groupId : groupIdList) {
                long appGroupId = ConsoleUtil.getAppGroupId(groupId, conf.getString("schedulerx.namespace"), conf.getString("schedulerx.namespace.source"));
                if (appGroupId <= 0L) {
                    throw new IOException("groupId=" + groupId + " is not exist");
                }
                groupIdMap.put(groupId, appGroupId);
            }
            SchedulerxWorker.initStsKey();
            this.authenticate(conf, groupIdList);
            SchedulerxWorker.restartActorSystem();
            Address address = actorSystem.provider().getDefaultAddress();
            WORKER_ADDR = address.host().get() + ":" + address.port().get();
            conf.setProperty("akkaPath", address.toString());
            if (conf.getBoolean("batch.work.enable", false)) {
                LOGGER.info("H2FilePersistence initing...");
                SchedulerxWorker.initStore();
                LOGGER.info("H2FilePersistence inited.");
            }
            LOGGER.info("LogCollector initing...");
            this.initLogCollector(WORKER_ADDR);
            LOGGER.info("LogCollector inited...");
            SchedulerxWorker.initServerDiscovery(groupIdList);
            LOGGER.info("ServerDiscovery inited.");
            SchedulerxWorker.initTimerTask(conf);
            LOGGER.info("timer task inited.");
            if (conf.getBoolean("log.collector.enable", true)) {
                LogCleaner logCleaner = LogCollectorFactory.newCleaner();
                logCleaner.init();
            }
            LOGGER.info("Schedulerx Worker started.");
            INITED = true;
        }
        catch (Throwable t) {
            LOGGER.error("Schedulerx Worker error", t);
            if (actorSystem != null) {
                actorSystem.terminate();
            }
            if (ConfigUtil.getWorkerConfig().getBoolean("block.app.start", false)) {
                throw t;
            }
        }
        finally {
            LOGGER.info("Schedulerx WorkerConfig" + ConfigUtil.toString(ConfigUtil.getWorkerConfig()));
        }
    }

    private String initConsoleDomain() throws Exception {
        String domain = System.getProperty("schedulerx.console.domain");
        if (StringUtils.isBlank(domain)) {
            domain = ConfigUtil.getWorkerConfig().getString("domainName");
        }
        if (StringUtils.isBlank(domain)) {
            domain = ConsoleUtil.getDomainFromHttpServer();
        }
        if (StringUtils.isBlank(domain)) {
            domain = DiamondUtil.getData("com.alibaba.schedulerx.domain");
        }
        if (domain != null && domain.contains("http")) {
            throw new DomainInvalidException("domainName need not http:// only domain eg: schedulerx2.tao.net");
        }
        ConfigUtil.getWorkerConfig().setProperty("domainName", domain);
        return domain;
    }

    private static void initStsKey() {
        String stsAK = System.getProperty("sts.accessKey");
        String stsSK = System.getProperty("sts.secretKey");
        String stsToken = System.getProperty("sts.token");
        if (StringUtils.isBlank(stsAK) && StringUtils.isBlank(ConfigUtil.getWorkerConfig().getString("sts.accessKey"))) {
            stsAK = System.getenv("sts.accessKey".replace(".", "_"));
            stsSK = System.getenv("sts.secretKey".replace(".", "_"));
            stsToken = System.getenv("sts.token".replace(".", "_"));
        }
        ConfigUtil.getWorkerConfig().setProperty("sts.accessKey", stsAK);
        ConfigUtil.getWorkerConfig().setProperty("sts.secretKey", stsSK);
        ConfigUtil.getWorkerConfig().setProperty("sts.token", stsToken);
    }

    private static String getLocalHost() {
        String domain = ConfigUtil.getWorkerConfig().getString("domainName");
        String localHost = System.getProperty("hsf.server.ip");
        if (StringUtils.isNotBlank(localHost)) {
            return localHost;
        }
        try (Socket socket = new Socket();){
            InetAddress address;
            if (domain != null) {
                if (domain.contains(":")) {
                    socket.connect(new InetSocketAddress(domain.split(":")[0], Integer.parseInt(domain.split(":")[1])));
                } else {
                    socket.connect(new InetSocketAddress(domain, 80));
                }
            }
            localHost = (address = socket.getLocalAddress()) instanceof Inet6Address ? IpUtil.getIPV4Address() : address.getHostAddress();
        }
        catch (IOException e) {
            LOGGER.error("get local host error", e);
            localHost = IpUtil.getIPV4Address();
        }
        return localHost;
    }

    private void initMetaInfoFromSystem(Configuration conf) throws Exception {
        Properties properties = System.getProperties();
        LOGGER.debug("system.properties=" + properties);
        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
            Object key = entry.getKey();
            Object value2 = entry.getValue();
            if (!key.toString().startsWith("schedulerx")) continue;
            conf.setProperty(key.toString(), value2);
        }
    }

    private void initMetaInfoFromConsole(String namespace, String namespaceSource, List<String> groupIds) throws Exception {
        Map<String, Object> properties = ConsoleUtil.fetchMetaInfoFromConsole(namespace, namespaceSource, groupIds);
        if (properties != null) {
            Configuration conf = ConfigUtil.getWorkerConfig();
            for (Map.Entry<String, Object> entry : properties.entrySet()) {
                conf.setProperty(entry.getKey(), entry.getValue());
            }
        }
    }

    private void initLogCollector(String workerAddr) {
        LogCollector logCollector = LogCollectorFactory.get();
        logCollector.collect("schedulerx_" + workerAddr, "hello schedulerx");
    }

    private String initNamespace() {
        String namespace = System.getProperty("schedulerx.namespace");
        if (StringUtils.isBlank(namespace)) {
            namespace = System.getProperty("tenant.id");
        }
        if (StringUtils.isBlank(namespace)) {
            namespace = ConfigUtil.getWorkerConfig().getString("schedulerx.namespace");
        }
        if (StringUtils.isBlank(namespace)) {
            namespace = System.getenv("schedulerx.namespace".replace(".", "_"));
        }
        ConfigUtil.getWorkerConfig().setProperty("schedulerx.namespace", namespace);
        String namespaceSource = System.getProperty("schedulerx.namespace.source");
        if (StringUtils.isBlank(namespaceSource)) {
            namespaceSource = System.getenv("schedulerx.namespace.source".replace(".", "_"));
        }
        if (StringUtils.isNotBlank(namespaceSource)) {
            ConfigUtil.getWorkerConfig().setProperty("schedulerx.namespace.source", namespaceSource);
        }
        return namespace;
    }

    private void checkParameters(Configuration conf, String namespace) throws Exception {
        if (conf.getBoolean("schedulerx.namespace.enable", false) && StringUtils.isBlank(namespace)) {
            throw new NamespaceNotFoundException("Not found namespace.");
        }
    }

    private void authenticate(Configuration conf, List<String> groupIds) throws Exception {
        String authenticatorClassName;
        Authenticator authenticator;
        Credentials credentials = SpasSdkClientFacade.getCredential();
        if (StringUtils.isNotBlank(credentials.getAccessKey())) {
            conf.setProperty("dauth.accessKey", credentials.getAccessKey());
        }
        if (StringUtils.isNotBlank(credentials.getSecretKey())) {
            conf.setProperty("dauth.secretKey", credentials.getSecretKey());
        }
        if ((authenticator = (Authenticator)ReflectionUtil.getInstanceByClassName(authenticatorClassName = conf.getString("schedulerx.authenticate", DefaultAuthenticator.class.getName()), CUSTOMER_CLASS_LOADER)) == null) {
            throw new IOException("authenticator is null");
        }
        authenticator.authenticate(conf, groupIds);
        LOGGER.info("authenticate success.");
    }

    private static void initStore() throws Exception {
        H2FilePersistence persistence = H2FilePersistence.getInstance();
        persistence.initTable();
    }

    private static boolean isolateMachine(String host) {
        block12: {
            Configuration conf = ConfigUtil.getWorkerConfig();
            ArrayList<String> enableUnits = Lists.newArrayList(conf.getStringArray("enable.units"));
            ArrayList<String> enableSites = Lists.newArrayList(conf.getStringArray("enable.sites"));
            ArrayList<String> disableUnits = Lists.newArrayList(conf.getStringArray("disable.units"));
            ArrayList<String> disableSites = Lists.newArrayList(conf.getStringArray("disable.sites"));
            if (!enableUnits.isEmpty() || !enableSites.isEmpty()) {
                try {
                    String url = "http://api.sh.gns.alibaba-inc.com/gns/armory/query?ip=" + host;
                    HttpResponse<JsonNode> response = Unirest.get(url).asJson();
                    ArmoryResult result2 = JsonUtil.fromJson(response.getBody().getObject().toString(), ArmoryResult.class);
                    if (result2.isSuccess() && result2.getData() != null) {
                        String unit = result2.getData().getUnit();
                        String simpleUnit = unit.substring(unit.indexOf(".") + 1);
                        String site = simpleUnit + "." + result2.getData().getSite();
                        if (!enableUnits.contains(simpleUnit) && !enableSites.contains(site)) {
                            LOGGER.warn("init isolated. ip=" + host + ", unit=" + simpleUnit + ", size=" + site);
                            return true;
                        }
                        break block12;
                    }
                    LOGGER.warn("get armory result failed, ip=" + host);
                }
                catch (Throwable e) {
                    LOGGER.error("init error, ", e);
                }
            } else if (!disableUnits.isEmpty() || !disableSites.isEmpty()) {
                try {
                    String url = "http://api.sh.gns.alibaba-inc.com/gns/armory/query?ip=" + host;
                    HttpResponse<JsonNode> response = Unirest.get(url).asJson();
                    ArmoryResult result3 = JsonUtil.fromJson(response.getBody().getObject().toString(), ArmoryResult.class);
                    if (result3.isSuccess() && result3.getData() != null) {
                        String unit = result3.getData().getUnit();
                        String simpleUnit = unit.substring(unit.indexOf(".") + 1);
                        String site = simpleUnit + "." + result3.getData().getSite();
                        if (disableUnits.contains(simpleUnit) || disableSites.contains(site)) {
                            LOGGER.warn("init isolated. ip=" + host + ", unit=" + simpleUnit + ", size=" + site);
                            return true;
                        }
                    } else {
                        LOGGER.warn("get armory result failed, ip=" + host);
                    }
                }
                catch (Throwable e) {
                    LOGGER.error("init error, ", e);
                }
            }
        }
        return false;
    }

    private static void initTimerTask(Configuration conf) throws Exception {
        List<AbstractTimerTask> timerTasks = ReflectionUtil.getInstancesByConf(conf, "worker.timer.tasks");
        if (timerTasks != null && !timerTasks.isEmpty()) {
            for (final AbstractTimerTask timerTask : timerTasks) {
                ScheduledExecutorService ses = Executors.newScheduledThreadPool(1, new ThreadFactory(){

                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Worker-timer-Thread-" + timerTask.getName());
                    }
                });
                ses.scheduleAtFixedRate(timerTask, timerTask.getInitialDelay(), timerTask.getPeriod(), TimeUnit.SECONDS);
                timerTask.init();
            }
        }
    }

    private static void initServerDiscovery(List<String> groupIdSet) throws Exception {
        for (String groupId : groupIdSet) {
            ServerDiscovery discovery = ServerDiscoveryFactory.getDiscovery(groupId);
            discovery.start(groupId);
        }
    }

    private static void initActors(ActorSystem actorSystem, String workerId) throws Exception {
        int heartbeatActorSize = ConfigUtil.getWorkerConfig().getInt("worker.heartbeat.actor.num", 2);
        actorSystem.actorOf(Props.create(WorkerHeartbeatActor.class, new Object[0]).withRouter(new RoundRobinPool(heartbeatActorSize)).withDispatcher("akka.actor.thread-dispatcher-heartbeat"), "heartbeat_routing");
        int instanceActorSize = ConfigUtil.getWorkerConfig().getInt("worker.jobinstance.actor.num", 128);
        actorSystem.actorOf(Props.create(JobInstanceActor.class, new Object[0]).withRouter(new RoundRobinPool(instanceActorSize)).withDispatcher("akka.actor.thread-dispatcher-instance"), "job_instance_routing");
        int logActorSize = ConfigUtil.getWorkerConfig().getInt("worker.log.actor.num", 100);
        actorSystem.actorOf(Props.create(LogActor.class, new Object[0]).withRouter(new RoundRobinPool(logActorSize)).withDispatcher("akka.actor.thread-dispatcher-log"), "log_routing");
        int containerActorSize = ConfigUtil.getWorkerConfig().getInt("worker.container.actor.num", 256);
        actorSystem.actorOf(Props.create(ContainerActor.class, new Object[0]).withRouter(new RoundRobinPool(containerActorSize)).withDispatcher("akka.actor.thread-dispatcher-container"), "container_routing");
        int taskActorSize = ConfigUtil.getWorkerConfig().getInt("worker.task.actor.num", 128);
        actorSystem.actorOf(Props.create(TaskActor.class, new Object[0]).withRouter(new RoundRobinPool(taskActorSize)).withDispatcher("akka.actor.thread-dispatcher-task"), "task_routing");
        AtLeastDeliveryRoutingActor = actorSystem.actorOf(AtLeastOnceDeliveryRoutingActor.props(ConfigUtil.getWorkerConfig().getInt("at.least.once.delivery.actor.num", 100)).withDispatcher("akka.actor.thread-dispatcher-delivery"), "at_least_once_delivery_routing");
        ActorRef gurantineRecoverActor = actorSystem.actorOf(Props.create(GuarantineRecoverActor.class, new Object[0]), "guarantine_recover");
        actorSystem.eventStream().subscribe(gurantineRecoverActor, AssociationErrorEvent.class);
        actorSystem.eventStream().subscribe(gurantineRecoverActor, ThisActorSystemQuarantinedEvent.class);
    }

    private static void initHeartBeat(ActorSystem actorSystem, final String workerId, final Map<String, Long> groupIdMap) {
        final TaskMasterPool masterPool = TaskMasterPool.INSTANCE;
        Configuration conf = ConfigUtil.getWorkerConfig();
        final Logger heatbeatLogger = LogFactory.getLogger("heartbeat");
        final String version = conf.getString("worker.version");
        final String starter = conf.getString("stater.mode", "java");
        final String source = conf.getString("schedulerx.worker.source", "unknown");
        actorSystem.scheduler().schedule(Duration.Zero(), Duration.create(5L, TimeUnit.SECONDS), new Runnable(){

            @Override
            public void run() {
                try {
                    for (Map.Entry groupEntry : groupIdMap.entrySet()) {
                        String groupId = (String)groupEntry.getKey();
                        long appGroupId = (Long)groupEntry.getValue();
                        ServerDiscovery serverDiscovery = ServerDiscoveryFactory.getDiscovery(groupId);
                        if (serverDiscovery == null || serverDiscovery.getActiveHeartBeatActor() == null) {
                            heatbeatLogger.warn("heartbeatActor is null, can be ignored if not frequently occurs");
                            return;
                        }
                        ActorSelection heartbeatActor = serverDiscovery.getActiveHeartBeatActor();
                        Worker.WorkerHeartBeatRequest request2 = Worker.WorkerHeartBeatRequest.newBuilder().setVersion(version).setGroupId(groupId).setWorkerId(workerId).addAllJobInstanceId(masterPool.getInstanceIds()).setMetricsJson(MetricsCollector.getMetricsJsonString()).setStarter(starter).setAppGroupId(appGroupId).setSource(source).build();
                        try {
                            FutureUtils.awaitResult(heartbeatActor, (Object)request2, 5L);
                            HealthTimeHolder.INSTANCE.resetServerHeartbeatTime();
                        }
                        catch (Exception ex) {
                            LOGGER.warn("active server={} lost.", serverDiscovery.getActiveServerAddr(), ex);
                        }
                    }
                }
                catch (Throwable t) {
                    LOGGER.warn("heartbeat error", t);
                }
            }
        }, (ExecutionContext)actorSystem.dispatcher());
    }

    public Configuration getConfig() {
        return ConfigUtil.getWorkerConfig();
    }

    public static void main(String[] args) throws Exception {
        try {
            SchedulerxWorker worker = new SchedulerxWorker();
            if (args != null && args.length == 1) {
                String agentConfPath = args[0];
                SchedulerxWorker.initAgentConf(agentConfPath);
            }
            worker.init();
        }
        catch (Exception e) {
            LOGGER.error("Schedulerx worker start error", e);
            System.exit(1);
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        LOGGER.info("set applicationContext={} to SpringContext={}", SpringContext.context, applicationContext);
        SpringContext.context = applicationContext;
        SpringContext.unlock();
        if (ConfigUtil.getWorkerConfig().getProperty("stater.mode") == null) {
            ConfigUtil.getWorkerConfig().setProperty("stater.mode", "spring");
        }
    }

    public void afterPropertiesSet() throws Exception {
        LOGGER.info("initializing bean...");
        this.init();
    }

    private void printMvnDenpendency() {
        LOGGER.info("===maven dependencies===");
        LOGGER.info("netty:" + Boss.class.getResource(""));
        LOGGER.info("protobuf-java:" + ProtocolMessageEnum.class.getResource(""));
        LOGGER.info("javaassist:" + JvstCodeGen.class.getResource(""));
        LOGGER.info("commons-configuration:" + Configuration.class.getResource(""));
        LOGGER.info("config:" + Config.class.getResource(""));
        LOGGER.info("===================");
    }

    public void setDomainName(String domainName) {
        if (StringUtils.isBlank(ConfigUtil.getWorkerConfig().getString("domainName"))) {
            ConfigUtil.getWorkerConfig().setProperty("domainName", domainName);
        }
    }

    public void setGroupId(String groupId) {
        ConfigUtil.getWorkerConfig().setProperty("groupId", groupId);
    }

    public void setAppKey(String appKey) {
        ConfigUtil.getWorkerConfig().setProperty("appKey", appKey);
    }

    public void setEnableBatchWork(boolean enableBatchWork) {
        ConfigUtil.getWorkerConfig().setProperty("batch.work.enable", enableBatchWork);
    }

    public void setEnableMaxRunningThreshold(int maxRunningThreshold) {
        ConfigUtil.getWorkerConfig().setProperty("map.master.running.threshold", maxRunningThreshold);
    }

    private static void initAgentConf(String agentConfPath) {
        Configuration agentConf = ConfigUtil.newConfig(agentConfPath);
        if (agentConf == null) {
            LOGGER.error("load agent conf error, agentConf path:{}", agentConfPath);
            return;
        }
        Configuration workerConf = ConfigUtil.getWorkerConfig();
        workerConf.setProperty("batch.work.enable", false);
        workerConf.setProperty("domainName", agentConf.getProperty("domainName"));
        workerConf.setProperty("groupId", agentConf.getProperty("groupId"));
        workerConf.setProperty("appKey", agentConf.getProperty("appKey"));
        workerConf.setProperty("schedulerx.namespace", agentConf.getProperty("namespace"));
        workerConf.setProperty("aliyun.accessKey", agentConf.getProperty("aliyunAccessKey"));
        workerConf.setProperty("aliyun.secretKey", agentConf.getProperty("aliyunSecretKey"));
        workerConf.setProperty("address.server.domain", agentConf.getProperty("endpoint"));
        workerConf.setProperty("address.server.port", agentConf.getProperty("endpointPort"));
    }

    public static void restartActorSystem() throws Exception {
        if (actorSystem != null) {
            actorSystem.awaitTermination();
            LOGGER.info("actorSystem terminated.");
        }
        String workerId = WorkerIdGenerator.get();
        String host = ConfigUtil.getWorkerConfig().getString("hostname", SchedulerxWorker.getLocalHost());
        int port = ConfigUtil.getWorkerConfig().getInt("port", 0);
        Config akkaConfig = ConfigUtil.getAkkaConfig("akka-worker.conf", host, port);
        actorSystem = ActorSystem.create(workerId, akkaConfig);
        SchedulerxWorker.initActors(actorSystem, workerId);
        LOGGER.info("actors inited.");
        SchedulerxWorker.initHeartBeat(actorSystem, workerId, groupIdMap);
        LOGGER.info("heartbeat inited.");
    }

    public void setEnableUnits(String units) {
        ConfigUtil.getWorkerConfig().setProperty("enable.units", units);
    }

    public void setEnableSites(String sites) {
        ConfigUtil.getWorkerConfig().setProperty("enable.sites", sites);
    }

    public void setDisableUnits(String units) {
        ConfigUtil.getWorkerConfig().setProperty("disable.units", units);
    }

    public void setDisableSites(String sites) {
        ConfigUtil.getWorkerConfig().setProperty("disable.sites", sites);
    }

    public void setAliyunAccessKey(String aliyunAccessKey) {
        ConfigUtil.getWorkerConfig().setProperty("aliyun.accessKey", aliyunAccessKey);
    }

    public void setAliyunSecretKey(String aliyunSecretKey) {
        ConfigUtil.getWorkerConfig().setProperty("aliyun.secretKey", aliyunSecretKey);
    }

    public void setSTSAccessKey(String stsAccessKey) {
        ConfigUtil.getWorkerConfig().setProperty("sts.accessKey", stsAccessKey);
    }

    public void setSTSSecretKey(String stsSecretKey) {
        ConfigUtil.getWorkerConfig().setProperty("sts.secretKey", stsSecretKey);
    }

    public void setSTSSecretToken(String stsSecretToken) {
        ConfigUtil.getWorkerConfig().setProperty("sts.token", stsSecretToken);
    }

    public void setHost(String host) {
        ConfigUtil.getWorkerConfig().setProperty("hostname", host);
    }

    public void setPort(int port) {
        ConfigUtil.getWorkerConfig().setProperty("port", port);
    }

    public void setClassLoader(ClassLoader userClassLoader) {
        CUSTOMER_CLASS_LOADER = userClassLoader;
    }

    public void setNamespace(String namespace) {
        if (StringUtils.isBlank(ConfigUtil.getWorkerConfig().getString("schedulerx.namespace"))) {
            ConfigUtil.getWorkerConfig().setProperty("schedulerx.namespace", namespace);
        }
    }

    public void setNamespaceSource(String namespaceSource) {
        ConfigUtil.getWorkerConfig().setProperty("schedulerx.namespace.source", namespaceSource);
    }

    public void setEndpoint(String endpoint) {
        if (StringUtils.isBlank((String)ConfigUtil.getWorkerConfig().getProperty("address.server.domain"))) {
            ConfigUtil.getWorkerConfig().setProperty("address.server.domain", endpoint);
        }
    }

    public void setEndpointPort(int endpointPort) {
        if (StringUtils.isBlank((String)ConfigUtil.getWorkerConfig().getProperty("address.server.port"))) {
            ConfigUtil.getWorkerConfig().setProperty("address.server.port", String.valueOf(endpointPort));
        }
    }

    public void setMaxTaskBodySize(int maxSize) {
        ConfigUtil.getWorkerConfig().setProperty("task.body.size.max", maxSize);
    }

    public void setBlockAppStart(boolean block) {
        ConfigUtil.getWorkerConfig().setProperty("block.app.start", block);
    }

    public void onApplicationEvent(ApplicationContextEvent event) {
        if (event instanceof ContextStartedEvent) {
            LOGGER.warn("SpringApplicationContext={} started and change to {}.", SpringContext.context, event.getApplicationContext());
            SpringContext.context = event.getApplicationContext();
            SpringContext.unlock();
        } else if (event instanceof ContextRefreshedEvent) {
            LOGGER.warn("SpringApplicationContext={} refreshed to {}.", SpringContext.context, event.getApplicationContext());
            SpringContext.context = event.getApplicationContext();
            SpringContext.unlock();
        } else if (event instanceof ContextStoppedEvent) {
            SpringContext.lock();
            LOGGER.warn("SpringApplicationContext={} stopped.", event.getApplicationContext());
        } else if (event instanceof ContextClosedEvent) {
            SpringContext.lock();
            LOGGER.warn("SpringApplicationContext={} closed.", event.getApplicationContext());
        }
    }
}

