/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.actor;

import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.container.Container;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.pull.PullManager;
import com.alibaba.schedulerx.worker.util.ContanerUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ContainerActor
extends UntypedActor {
    private ContainerPool containerPool = ContainerFactory.getContainerPool();
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private static Logger LOGGER = LogFactory.getLogger(ContainerActor.class);
    private static ThreadPoolExecutor containerStarter = new ThreadPoolExecutor(8, 8, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){
        private final AtomicInteger nextId = new AtomicInteger(1);
        private final String namePrefix = "Container-Starter-Thread-";

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Container-Starter-Thread-" + this.nextId.getAndIncrement());
        }
    }, new ThreadPoolExecutor.CallerRunsPolicy());
    private LogCollector logCollector = LogCollectorFactory.get();

    public static Props props() {
        return Props.create(ContainerActor.class, new Object[0]);
    }

    @Override
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Worker.MasterStartContainerRequest) {
            this.handleStartContainer((Worker.MasterStartContainerRequest)obj);
        } else if (obj instanceof Worker.MasterBatchStartContainersRequest) {
            this.handleBatchStartContainers((Worker.MasterBatchStartContainersRequest)obj);
        } else if (obj instanceof Worker.MasterKillContainerRequest) {
            this.handleKillContainer((Worker.MasterKillContainerRequest)obj);
        } else if (obj instanceof Worker.MasterDestroyContainerPoolRequest) {
            this.handleDestroyContainerPool((Worker.MasterDestroyContainerPoolRequest)obj);
        }
    }

    private void handleStartContainer(Worker.MasterStartContainerRequest request2) {
        Worker.MasterStartContainerResponse response = null;
        try {
            String uniqueId = this.startContainer(request2);
            response = Worker.MasterStartContainerResponse.newBuilder().setSuccess(true).build();
            LOGGER.debug("submit container to containerPool, uniqueId={}, cost={}ms", uniqueId, System.currentTimeMillis() - request2.getScheduleTime());
        }
        catch (Throwable e) {
            String uniqueId = IdUtil.getUniqueId(request2.getJobId(), request2.getJobInstanceId(), request2.getTaskId());
            response = Worker.MasterStartContainerResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
            LOGGER.error("handleStartContainer error.", e);
            this.logCollector.collect(uniqueId, "[ContainerActor-handleStartContainer]container start instance fail.", e);
        }
        this.getSender().tell(response, this.getSelf());
    }

    private void handleBatchStartContainers(Worker.MasterBatchStartContainersRequest request2) {
        LOGGER.info("jobInstanceId={}, batch start containers, size:{}", request2.getJobInstanceId(), request2.getStartReqsCount());
        containerStarter.submit(new ContainerStartRunnable(request2));
        Worker.MasterBatchStartContainersResponse response = Worker.MasterBatchStartContainersResponse.newBuilder().setSuccess(true).build();
        this.getSender().tell(response, this.getSelf());
    }

    private String startContainer(Worker.MasterStartContainerRequest request2) throws Exception {
        String uniqueId = IdUtil.getUniqueId(request2.getJobId(), request2.getJobInstanceId(), request2.getTaskId());
        LOGGER.debug("handleStartContainer, uniqueId={}, cost={}ms", uniqueId, System.currentTimeMillis() - request2.getScheduleTime());
        JobContext context = ContanerUtil.convert2JobContext(request2);
        Container container = ContainerFactory.create(context);
        if (container != null) {
            this.containerPool.put(context.getUniqueId(), container);
            if (!this.statusReqBatchHandlerPool.contains(request2.getJobInstanceId())) {
                ReqQueue reqQueue = new ReqQueue(request2.getJobInstanceId(), 100000);
                reqQueue.init();
                this.statusReqBatchHandlerPool.start(request2.getJobInstanceId(), new ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>(request2.getJobInstanceId(), 1, 1, 3000, reqQueue, request2.getInstanceMasterAkkaPath()));
            }
            int consumerNum = request2.hasConsumerNum() ? request2.getConsumerNum() : 5;
            this.containerPool.submit(context.getJobId(), context.getJobInstanceId(), context.getTaskId(), container, consumerNum);
        } else {
            LOGGER.warn("Container is null, uniqueId={}", uniqueId);
        }
        return uniqueId;
    }

    private void handleKillContainer(Worker.MasterKillContainerRequest request2) {
        Worker.MasterKillContainerResponse response;
        long jobId = request2.getJobId();
        long jobInstanceId = request2.getJobInstanceId();
        String uniqueIdToLogger = "";
        try {
            if (request2.hasTaskId()) {
                long taskId = request2.getTaskId();
                String uniqueId = IdUtil.getUniqueId(jobId, jobInstanceId, taskId);
                if (this.containerPool.contain(uniqueId)) {
                    this.containerPool.get(uniqueId).kill();
                }
                uniqueIdToLogger = uniqueId;
                LOGGER.info("kill task container success, uniqueId={}", uniqueIdToLogger);
            } else {
                uniqueIdToLogger = IdUtil.getUniqueIdWithoutTask(jobId, jobInstanceId);
                this.killInstance(jobId, jobInstanceId);
                LOGGER.info("kill instance success, uniqueId:{}", uniqueIdToLogger);
            }
            response = Worker.MasterKillContainerResponse.newBuilder().setSuccess(true).build();
            this.logCollector.collect(uniqueIdToLogger, "[ContainerActor-handleKillContainer]container kill instance success.");
        }
        catch (Throwable t) {
            LOGGER.error("kill container exception", t);
            this.logCollector.collect(uniqueIdToLogger, "[ContainerActor-handleKillContainer]container kill instance fail.", t);
            response = Worker.MasterKillContainerResponse.newBuilder().setSuccess(false).setMessage(t.getMessage()).build();
        }
        this.getSender().tell(response, this.getSelf());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleDestroyContainerPool(Worker.MasterDestroyContainerPoolRequest request2) {
        try {
            ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest> handler = this.statusReqBatchHandlerPool.getHandlers().get(request2.getJobInstanceId());
            if (handler != null) {
                if (handler.getLatestRequest() != null && ((Worker.ContainerReportTaskStatusRequest)handler.getLatestRequest()).getSerialNum() != request2.getSerialNum()) {
                    LOGGER.info("skip handleDestroyContainerPool cycleId={}_{}, handler serialNum={}.", request2.getJobInstanceId(), request2.getSerialNum(), ((Worker.ContainerReportTaskStatusRequest)handler.getLatestRequest()).getSerialNum());
                    return;
                }
                LOGGER.info("handleDestroyContainerPool from cycleId={}_{}, handler serialNum={}.", request2.getJobInstanceId(), request2.getSerialNum(), ((Worker.ContainerReportTaskStatusRequest)handler.getLatestRequest()).getSerialNum());
                this.containerPool.destroyByInstance(request2.getJobInstanceId());
                this.statusReqBatchHandlerPool.stop(request2.getJobInstanceId());
                PullManager.INSTANCE.stop(request2.getJobInstanceId());
            }
        }
        catch (Throwable th) {
            LOGGER.error("cycleId={}_{} handleDestroyContainerPool failed.", request2.getJobInstanceId(), request2.getSerialNum(), th);
        }
        finally {
            Worker.MasterDestroyContainerPoolResponse response = Worker.MasterDestroyContainerPoolResponse.newBuilder().setSuccess(true).setDeliveryId(request2.getDeliveryId()).build();
            this.getSender().tell(response, this.getSelf());
        }
    }

    private void killInstance(long jobId, long jobInstanceId) {
        Map<String, Container> containerMap = this.containerPool.getContainerMap();
        String prefixKey = jobId + "_" + jobInstanceId;
        Iterator<Map.Entry<String, Container>> it = containerMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Container> entry = it.next();
            String uniqueId = entry.getKey();
            Container container = entry.getValue();
            if (!uniqueId.startsWith(prefixKey)) continue;
            container.kill();
            it.remove();
        }
        this.containerPool.destroyByInstance(jobInstanceId);
    }

    static {
        containerStarter.allowCoreThreadTimeOut(true);
    }

    private class ContainerStartRunnable
    implements Runnable {
        private Worker.MasterBatchStartContainersRequest request;

        ContainerStartRunnable(Worker.MasterBatchStartContainersRequest request2) {
            this.request = request2;
        }

        @Override
        public void run() {
            for (Worker.MasterStartContainerRequest req : this.request.getStartReqsList()) {
                try {
                    String uniqueId = ContainerActor.this.startContainer(req);
                    LOGGER.debug("submit container to containerPool, uniqueId={}, cost={}ms", uniqueId, System.currentTimeMillis() - req.getScheduleTime());
                }
                catch (Throwable e) {
                    String uniqueId = IdUtil.getUniqueId(req.getJobId(), this.request.getJobInstanceId(), req.getTaskId());
                    ContainerActor.this.logCollector.collect(uniqueId, "[ContainerActor-handleStartContainer]container start instance fail.", e);
                    Worker.ContainerReportTaskStatusRequest.Builder resultBuilder = Worker.ContainerReportTaskStatusRequest.newBuilder();
                    resultBuilder.setJobId(req.getJobId());
                    resultBuilder.setJobInstanceId(req.getJobInstanceId());
                    resultBuilder.setTaskId(req.getTaskId());
                    resultBuilder.setStatus(TaskStatus.FAILED.getValue());
                    Address address = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
                    String workerAddr = address.host().get() + ":" + address.port().get();
                    resultBuilder.setWorkerAddr(workerAddr);
                    resultBuilder.setWorkerId(WorkerIdGenerator.get());
                    if (req.getTaskName() != null) {
                        resultBuilder.setTaskName(req.getTaskName());
                    }
                    ActorSelection masterActorSelection = SchedulerxWorker.actorSystem.actorSelection(req.getInstanceMasterAkkaPath());
                    masterActorSelection.tell(resultBuilder.build(), null);
                }
            }
        }
    }
}

