/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.actor;

import akka.actor.ActorContext;
import akka.actor.UntypedActor;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceData;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.protocol.Common;
import com.alibaba.schedulerx.protocol.Server;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterFactory;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import com.alibaba.schedulerx.worker.pull.PullManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.joda.time.DateTime;

public class JobInstanceActor
extends UntypedActor {
    private TaskMasterPool masterPool = TaskMasterPool.INSTANCE;
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(JobInstanceActor.class);

    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Server.ServerSubmitJobInstanceRequest) {
            this.handleSubmitJobInstance((Server.ServerSubmitJobInstanceRequest)obj);
        } else if (obj instanceof Server.ServerKillJobInstanceRequest) {
            this.handleKillJobInstance((Server.ServerKillJobInstanceRequest)obj);
        } else if (obj instanceof Server.ServerRetryTasksRequest) {
            this.handleRetryTasks((Server.ServerRetryTasksRequest)obj);
        } else if (obj instanceof Server.ServerKillTaskRequest) {
            this.handleKillTask((Server.ServerKillTaskRequest)obj);
        } else if (obj instanceof Server.ServerCheckTaskMasterRequest) {
            this.handCheckTaskMaster((Server.ServerCheckTaskMasterRequest)obj);
        } else if (obj instanceof Worker.MasterNotifyWorkerPullRequest) {
            this.handleInitPull((Worker.MasterNotifyWorkerPullRequest)obj);
        }
    }

    private void handleSubmitJobInstance(Server.ServerSubmitJobInstanceRequest request) {
        LOGGER.info("handleSubmitJobInstance, jobInstanceId=" + request.getJobInstanceId());
        Server.ServerSubmitJobInstanceResponse response = null;
        if (this.masterPool.contains(request.getJobInstanceId())) {
            String errMsg = "jobInstanceId=" + request.getJobInstanceId() + " is still running!";
            LOGGER.debug(errMsg);
            this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), ClientLoggerMessage.appendMessage("[JobInstanceActor-handleSubmitJobInstance]server trigger client fail.", errMsg));
            response = Server.ServerSubmitJobInstanceResponse.newBuilder().setSuccess(false).setMessage(errMsg).build();
            this.getSender().tell((Object)response, this.getSelf());
        } else {
            response = Server.ServerSubmitJobInstanceResponse.newBuilder().setSuccess(true).build();
            this.getSender().tell((Object)response, this.getSelf());
            try {
                JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request);
                TaskMaster taskMaster = TaskMasterFactory.create(jobInstanceInfo, (ActorContext)this.getContext());
                this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
                taskMaster.submitInstance(jobInstanceInfo);
                LOGGER.debug("submit jobInstanceId={} successfully", request.getJobInstanceId());
                this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), "[JobInstanceActor-handleSubmitJobInstance]server trigger client success.");
            }
            catch (Throwable e) {
                LOGGER.error("handleSubmitJobInstance error, jobInstanceId={}, ", request.getJobInstanceId(), e);
                this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), "[JobInstanceActor-handleSubmitJobInstance]server trigger client fail.", e);
                Worker.WorkerReportJobInstanceStatusRequest req = Worker.WorkerReportJobInstanceStatusRequest.newBuilder().setJobId(request.getJobId()).setJobInstanceId(request.getJobInstanceId()).setStatus(InstanceStatus.FAILED.getValue()).setResult(ExceptionUtil.getMessage(e)).setGroupId(request.getGroupId()).build();
                SchedulerxWorker.AtLeastDeliveryRoutingActor.tell((Object)req, null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleKillJobInstance(Server.ServerKillJobInstanceRequest request) {
        Server.ServerKillJobInstanceResponse response;
        block5: {
            response = null;
            String uniqueId = IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId());
            try {
                long jobInstanceId = request.getJobInstanceId();
                if (!this.masterPool.contains(jobInstanceId)) {
                    response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(false).setMessage(jobInstanceId + " is not exist").build();
                    this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[JobInstanceActor-handleKillJobInstance]server kill instance start fail.", response.getMessage()));
                    break block5;
                }
                this.masterPool.get(jobInstanceId).killInstance("killed from server");
                response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(true).build();
                this.logCollector.collect(uniqueId, "[JobInstanceActor-handleKillJobInstance]server kill instance start success.");
            }
            catch (Throwable e) {
                try {
                    LOGGER.error("[JobInstanceActor]handleKillJobInstance error, uniqueId:{}", uniqueId, e);
                    response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                }
                catch (Throwable throwable) {
                    this.getSender().tell(response, this.getSelf());
                    throw throwable;
                }
                this.getSender().tell((Object)response, this.getSelf());
            }
        }
        this.getSender().tell((Object)response, this.getSelf());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleKillTask(Server.ServerKillTaskRequest request) {
        Server.ServerKillTaskResponse response;
        block5: {
            response = null;
            try {
                long jobInstanceId = request.getJobInstanceId();
                if (!this.masterPool.contains(jobInstanceId)) {
                    response = Server.ServerKillTaskResponse.newBuilder().setSuccess(false).setMessage(jobInstanceId + " is not exist").build();
                    break block5;
                }
                this.masterPool.get(jobInstanceId).killTask(IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId()), request.getWorkerId(), request.getWorkerAddr());
                response = Server.ServerKillTaskResponse.newBuilder().setSuccess(true).build();
            }
            catch (Throwable e) {
                try {
                    LOGGER.error("", e);
                    response = Server.ServerKillTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                }
                catch (Throwable throwable) {
                    this.getSender().tell(response, this.getSelf());
                    throw throwable;
                }
                this.getSender().tell((Object)response, this.getSelf());
            }
        }
        this.getSender().tell((Object)response, this.getSelf());
    }

    private void handleRetryTasks(Server.ServerRetryTasksRequest request) {
        JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request);
        Server.ServerRetryTasksResponse response = null;
        TaskMaster taskMaster = this.masterPool.get(jobInstanceInfo.getJobInstanceId());
        if (taskMaster == null) {
            try {
                taskMaster = TaskMasterFactory.create(jobInstanceInfo, (ActorContext)this.getContext());
                this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
            }
            catch (Exception e) {
                LOGGER.error("", e);
                response = Server.ServerRetryTasksResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
            }
        }
        if (taskMaster != null) {
            taskMaster.retryTasks(request.getRetryTaskEntityList());
            response = Server.ServerRetryTasksResponse.newBuilder().setSuccess(true).build();
        }
        this.getSender().tell(response, this.getSelf());
    }

    private void handCheckTaskMaster(Server.ServerCheckTaskMasterRequest request) {
        long jobInstanceId = request.getJobInstanceId();
        Server.ServerCheckTaskMasterResponse response = null;
        response = !this.masterPool.contains(jobInstanceId) ? Server.ServerCheckTaskMasterResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not existed of jobInstance=" + jobInstanceId).build() : Server.ServerCheckTaskMasterResponse.newBuilder().setSuccess(true).build();
        this.getSender().tell((Object)response, this.getSelf());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleInitPull(Worker.MasterNotifyWorkerPullRequest request) {
        Worker.MasterNotifyWorkerPullResponse response = null;
        try {
            PullManager.INSTANCE.init(request.getJobInstanceId(), request.getPageSize(), request.getQueueSize(), request.getConsumerSize(), request.getTaskMasterAkkaPath());
            response = Worker.MasterNotifyWorkerPullResponse.newBuilder().setSuccess(true).build();
        }
        catch (Exception e) {
            try {
                LOGGER.error("", e);
                response = Worker.MasterNotifyWorkerPullResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
            this.getSender().tell((Object)response, this.getSelf());
        }
        this.getSender().tell((Object)response, this.getSelf());
    }

    private JobInstanceInfo convet2JobInstanceInfo(Server.ServerSubmitJobInstanceRequest request) {
        CopyOnWriteArrayList<String> workers = Lists.newCopyOnWriteArrayList(request.getWorkersList());
        Collections.shuffle(workers);
        JobInstanceInfo.JobInstanceInfoBuilder builder = JobInstanceInfo.newBuilder();
        builder.setJobId(request.getJobId());
        builder.setJobInstanceId(request.getJobInstanceId());
        builder.setExecuteMode(request.getExecuteMode());
        builder.setJobType(request.getJobType());
        builder.setContent(request.getContent());
        builder.setUser(request.getUser());
        builder.setScheduleTime(new DateTime(request.getScheduleTime()));
        builder.setDataTime(new DateTime(request.getDataTime()));
        builder.setAllWorkers(workers);
        builder.setJobConcurrency(request.getJobConcurrency());
        builder.setRegionId(request.getRegionId());
        builder.setAppGroupId(request.getAppGroupId());
        builder.setTimeType(request.hasTimeType() ? request.getTimeType() : 0);
        builder.setTimeExpression(request.hasTimeExpression() ? request.getTimeExpression() : null);
        builder.setGroupId(request.getGroupId());
        if (request.hasParameters()) {
            builder.setParameters(request.getParameters());
        }
        if (request.hasXattrs()) {
            builder.setXattrs(request.getXattrs());
        }
        if (request.hasInstanceParameters()) {
            builder.setInstanceParameters(request.getInstanceParameters());
        }
        List upstreamDataList = request.getUpstreamDataList();
        ArrayList<JobInstanceData> upstreamDataPairList = Lists.newArrayList();
        for (Common.UpstreamData upstreamData : upstreamDataList) {
            upstreamDataPairList.add(new JobInstanceData(upstreamData.getJobName(), upstreamData.getData()));
        }
        builder.setUpstreamData(upstreamDataPairList);
        if (request.hasMaxAttempt()) {
            builder.setMaxAttempt(request.getMaxAttempt());
        }
        if (request.hasAttempt()) {
            builder.setAttempt(request.getAttempt());
        }
        if (request.hasWfInstanceId()) {
            builder.setWfInstanceId(request.getWfInstanceId());
        }
        if (request.hasJobName()) {
            builder.setJobName(request.getJobName());
        }
        return builder.build();
    }

    private JobInstanceInfo convet2JobInstanceInfo(Server.ServerRetryTasksRequest request) {
        CopyOnWriteArrayList<String> workers = Lists.newCopyOnWriteArrayList(request.getWorkersList());
        Collections.shuffle(workers);
        JobInstanceInfo.JobInstanceInfoBuilder builder = JobInstanceInfo.newBuilder();
        builder.setJobId(request.getJobId());
        builder.setJobInstanceId(request.getJobInstanceId());
        builder.setExecuteMode(request.getExecuteMode());
        builder.setJobType(request.getJobType());
        builder.setContent(request.getContent());
        builder.setUser(request.getUser());
        builder.setScheduleTime(new DateTime(request.getScheduleTime()));
        builder.setDataTime(new DateTime(request.getDataTime()));
        builder.setAllWorkers(workers);
        builder.setJobConcurrency(request.getJobConcurrency());
        builder.setRegionId(request.getRegionId());
        builder.setAppGroupId(request.getAppGroupId());
        builder.setTimeType(request.hasTimeType() ? request.getTimeType() : 0);
        builder.setTimeExpression(request.hasTimeExpression() ? request.getTimeExpression() : null);
        builder.setGroupId(request.getGroupId());
        if (request.hasParameters()) {
            builder.setParameters(request.getParameters());
        }
        if (request.hasXattrs()) {
            builder.setXattrs(request.getXattrs());
        }
        if (request.hasInstanceParameters()) {
            builder.setInstanceParameters(request.getInstanceParameters());
        }
        List upstreamDataList = request.getUpstreamDataList();
        ArrayList<JobInstanceData> upstreamDataPairList = Lists.newArrayList();
        for (Common.UpstreamData upstreamData : upstreamDataList) {
            upstreamDataPairList.add(new JobInstanceData(upstreamData.getJobName(), upstreamData.getData()));
        }
        builder.setUpstreamData(upstreamDataPairList);
        if (request.hasMaxAttempt()) {
            builder.setMaxAttempt(request.getMaxAttempt());
        }
        if (request.hasAttempt()) {
            builder.setAttempt(request.getAttempt());
        }
        if (request.hasWfInstanceId()) {
            builder.setWfInstanceId(request.getWfInstanceId());
        }
        if (request.hasJobName()) {
            builder.setJobName(request.getJobName());
        }
        return builder.build();
    }
}

