/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.actor;

import akka.actor.UntypedActor;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceData;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.protocol.Common;
import com.alibaba.schedulerx.protocol.Server;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterFactory;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import com.alibaba.schedulerx.worker.pull.PullManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.joda.time.DateTime;

public class JobInstanceActor
extends UntypedActor {
    private TaskMasterPool masterPool = TaskMasterPool.INSTANCE;
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(JobInstanceActor.class);

    @Override
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Server.ServerSubmitJobInstanceRequest) {
            this.handleSubmitJobInstance((Server.ServerSubmitJobInstanceRequest)obj);
        } else if (obj instanceof Server.ServerKillJobInstanceRequest) {
            this.handleKillJobInstance((Server.ServerKillJobInstanceRequest)obj);
        } else if (obj instanceof Server.ServerRetryTasksRequest) {
            this.handleRetryTasks((Server.ServerRetryTasksRequest)obj);
        } else if (obj instanceof Server.ServerKillTaskRequest) {
            this.handleKillTask((Server.ServerKillTaskRequest)obj);
        } else if (obj instanceof Server.ServerCheckTaskMasterRequest) {
            this.handCheckTaskMaster((Server.ServerCheckTaskMasterRequest)obj);
        } else if (obj instanceof Worker.MasterNotifyWorkerPullRequest) {
            this.handleInitPull((Worker.MasterNotifyWorkerPullRequest)obj);
        }
    }

    private void handleSubmitJobInstance(Server.ServerSubmitJobInstanceRequest request2) {
        LOGGER.info("handleSubmitJobInstance, jobInstanceId=" + request2.getJobInstanceId());
        Server.ServerSubmitJobInstanceResponse response = null;
        if (this.masterPool.contains(request2.getJobInstanceId())) {
            String errMsg = "jobInstanceId=" + request2.getJobInstanceId() + " is still running!";
            LOGGER.debug(errMsg);
            this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request2.getJobId(), request2.getJobInstanceId()), ClientLoggerMessage.appendMessage("[JobInstanceActor-handleSubmitJobInstance]server trigger client fail.", errMsg));
            response = Server.ServerSubmitJobInstanceResponse.newBuilder().setSuccess(false).setMessage(errMsg).build();
            this.getSender().tell(response, this.getSelf());
        } else {
            response = Server.ServerSubmitJobInstanceResponse.newBuilder().setSuccess(true).build();
            this.getSender().tell(response, this.getSelf());
            try {
                JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request2);
                TaskMaster taskMaster = TaskMasterFactory.create(jobInstanceInfo, this.getContext());
                this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
                taskMaster.submitInstance(jobInstanceInfo);
                LOGGER.debug("submit jobInstanceId={} successfully", request2.getJobInstanceId());
                this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request2.getJobId(), request2.getJobInstanceId()), "[JobInstanceActor-handleSubmitJobInstance]server trigger client success.");
            }
            catch (Throwable e) {
                LOGGER.error("handleSubmitJobInstance error, jobInstanceId={}, ", request2.getJobInstanceId(), e);
                this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request2.getJobId(), request2.getJobInstanceId()), "[JobInstanceActor-handleSubmitJobInstance]server trigger client fail.", e);
                Worker.WorkerReportJobInstanceStatusRequest req = Worker.WorkerReportJobInstanceStatusRequest.newBuilder().setJobId(request2.getJobId()).setJobInstanceId(request2.getJobInstanceId()).setStatus(InstanceStatus.FAILED.getValue()).setResult(ExceptionUtil.getMessage(e)).setGroupId(request2.getGroupId()).build();
                SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(req, null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleKillJobInstance(Server.ServerKillJobInstanceRequest request2) {
        Server.ServerKillJobInstanceResponse response = null;
        String uniqueId = IdUtil.getUniqueIdWithoutTask(request2.getJobId(), request2.getJobInstanceId());
        try {
            long jobInstanceId = request2.getJobInstanceId();
            if (!this.masterPool.contains(jobInstanceId)) {
                response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(false).setMessage(jobInstanceId + " is not exist").build();
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[JobInstanceActor-handleKillJobInstance]server kill instance start fail.", response.getMessage()));
            } else {
                this.masterPool.get(jobInstanceId).killInstance("killed from server");
                response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(true).build();
                this.logCollector.collect(uniqueId, "[JobInstanceActor-handleKillJobInstance]server kill instance start success.");
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Throwable e) {
            try {
                LOGGER.error("[JobInstanceActor]handleKillJobInstance error, uniqueId:{}", uniqueId, e);
                response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleKillTask(Server.ServerKillTaskRequest request2) {
        Server.ServerKillTaskResponse response = null;
        try {
            long jobInstanceId = request2.getJobInstanceId();
            if (!this.masterPool.contains(jobInstanceId)) {
                response = Server.ServerKillTaskResponse.newBuilder().setSuccess(false).setMessage(jobInstanceId + " is not exist").build();
            } else {
                this.masterPool.get(jobInstanceId).killTask(IdUtil.getUniqueId(request2.getJobId(), request2.getJobInstanceId(), request2.getTaskId()), request2.getWorkerId(), request2.getWorkerAddr());
                response = Server.ServerKillTaskResponse.newBuilder().setSuccess(true).build();
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Throwable e) {
            try {
                LOGGER.error("", e);
                response = Server.ServerKillTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    private void handleRetryTasks(Server.ServerRetryTasksRequest request2) {
        JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request2);
        Server.ServerRetryTasksResponse response = null;
        TaskMaster taskMaster = this.masterPool.get(jobInstanceInfo.getJobInstanceId());
        if (taskMaster == null) {
            try {
                taskMaster = TaskMasterFactory.create(jobInstanceInfo, this.getContext());
                this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
            }
            catch (Exception e) {
                LOGGER.error("", e);
                response = Server.ServerRetryTasksResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
            }
        }
        if (taskMaster != null) {
            taskMaster.retryTasks(request2.getRetryTaskEntityList());
            response = Server.ServerRetryTasksResponse.newBuilder().setSuccess(true).build();
        }
        this.getSender().tell(response, this.getSelf());
    }

    private void handCheckTaskMaster(Server.ServerCheckTaskMasterRequest request2) {
        long jobInstanceId = request2.getJobInstanceId();
        Server.ServerCheckTaskMasterResponse response = null;
        response = !this.masterPool.contains(jobInstanceId) ? Server.ServerCheckTaskMasterResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not existed of jobInstance=" + jobInstanceId).build() : Server.ServerCheckTaskMasterResponse.newBuilder().setSuccess(true).build();
        this.getSender().tell(response, this.getSelf());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleInitPull(Worker.MasterNotifyWorkerPullRequest request2) {
        Worker.MasterNotifyWorkerPullResponse response = null;
        try {
            PullManager.INSTANCE.init(request2.getJobInstanceId(), request2.getPageSize(), request2.getQueueSize(), request2.getConsumerSize(), request2.getTaskMasterAkkaPath());
            response = Worker.MasterNotifyWorkerPullResponse.newBuilder().setSuccess(true).build();
            this.getSender().tell(response, this.getSelf());
        }
        catch (Exception e) {
            try {
                LOGGER.error("", e);
                response = Worker.MasterNotifyWorkerPullResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    private JobInstanceInfo convet2JobInstanceInfo(Server.ServerSubmitJobInstanceRequest request2) {
        CopyOnWriteArrayList<String> workers = Lists.newCopyOnWriteArrayList(request2.getWorkersList());
        Collections.shuffle(workers);
        JobInstanceInfo.JobInstanceInfoBuilder builder = JobInstanceInfo.newBuilder();
        builder.setJobId(request2.getJobId());
        builder.setJobInstanceId(request2.getJobInstanceId());
        builder.setExecuteMode(request2.getExecuteMode());
        builder.setJobType(request2.getJobType());
        builder.setContent(request2.getContent());
        builder.setUser(request2.getUser());
        builder.setScheduleTime(new DateTime(request2.getScheduleTime()));
        builder.setDataTime(new DateTime(request2.getDataTime()));
        builder.setAllWorkers(workers);
        builder.setJobConcurrency(request2.getJobConcurrency());
        builder.setRegionId(request2.getRegionId());
        builder.setAppGroupId(request2.getAppGroupId());
        builder.setTimeType(request2.hasTimeType() ? request2.getTimeType() : 0);
        builder.setTimeExpression(request2.hasTimeExpression() ? request2.getTimeExpression() : null);
        builder.setGroupId(request2.getGroupId());
        if (request2.hasParameters()) {
            builder.setParameters(request2.getParameters());
        }
        if (request2.hasXattrs()) {
            builder.setXattrs(request2.getXattrs());
        }
        if (request2.hasInstanceParameters()) {
            builder.setInstanceParameters(request2.getInstanceParameters());
        }
        List<Common.UpstreamData> upstreamDataList = request2.getUpstreamDataList();
        ArrayList<JobInstanceData> upstreamDataPairList = Lists.newArrayList();
        for (Common.UpstreamData upstreamData : upstreamDataList) {
            upstreamDataPairList.add(new JobInstanceData(upstreamData.getJobName(), upstreamData.getData()));
        }
        builder.setUpstreamData(upstreamDataPairList);
        if (request2.hasMaxAttempt()) {
            builder.setMaxAttempt(request2.getMaxAttempt());
        }
        if (request2.hasAttempt()) {
            builder.setAttempt(request2.getAttempt());
        }
        if (request2.hasWfInstanceId()) {
            builder.setWfInstanceId(request2.getWfInstanceId());
        }
        if (request2.hasJobName()) {
            builder.setJobName(request2.getJobName());
        }
        return builder.build();
    }

    private JobInstanceInfo convet2JobInstanceInfo(Server.ServerRetryTasksRequest request2) {
        CopyOnWriteArrayList<String> workers = Lists.newCopyOnWriteArrayList(request2.getWorkersList());
        Collections.shuffle(workers);
        JobInstanceInfo.JobInstanceInfoBuilder builder = JobInstanceInfo.newBuilder();
        builder.setJobId(request2.getJobId());
        builder.setJobInstanceId(request2.getJobInstanceId());
        builder.setExecuteMode(request2.getExecuteMode());
        builder.setJobType(request2.getJobType());
        builder.setContent(request2.getContent());
        builder.setUser(request2.getUser());
        builder.setScheduleTime(new DateTime(request2.getScheduleTime()));
        builder.setDataTime(new DateTime(request2.getDataTime()));
        builder.setAllWorkers(workers);
        builder.setJobConcurrency(request2.getJobConcurrency());
        builder.setRegionId(request2.getRegionId());
        builder.setAppGroupId(request2.getAppGroupId());
        builder.setTimeType(request2.hasTimeType() ? request2.getTimeType() : 0);
        builder.setTimeExpression(request2.hasTimeExpression() ? request2.getTimeExpression() : null);
        builder.setGroupId(request2.getGroupId());
        if (request2.hasParameters()) {
            builder.setParameters(request2.getParameters());
        }
        if (request2.hasXattrs()) {
            builder.setXattrs(request2.getXattrs());
        }
        if (request2.hasInstanceParameters()) {
            builder.setInstanceParameters(request2.getInstanceParameters());
        }
        List<Common.UpstreamData> upstreamDataList = request2.getUpstreamDataList();
        ArrayList<JobInstanceData> upstreamDataPairList = Lists.newArrayList();
        for (Common.UpstreamData upstreamData : upstreamDataList) {
            upstreamDataPairList.add(new JobInstanceData(upstreamData.getJobName(), upstreamData.getData()));
        }
        builder.setUpstreamData(upstreamDataPairList);
        if (request2.hasMaxAttempt()) {
            builder.setMaxAttempt(request2.getMaxAttempt());
        }
        if (request2.hasAttempt()) {
            builder.setAttempt(request2.getAttempt());
        }
        if (request2.hasWfInstanceId()) {
            builder.setWfInstanceId(request2.getWfInstanceId());
        }
        if (request2.hasJobName()) {
            builder.setJobName(request2.getJobName());
        }
        return builder.build();
    }
}

