/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.processor;

import com.alibaba.schedulerx.common.domain.StreamType;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.processor.ShellStreamProcessor;
import com.alibaba.schedulerx.worker.util.ShellUtil;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

public class ShellProcessor
implements JobProcessor {
    private Process shellProcess = null;
    private LogCollector logCollector = LogCollectorFactory.get();
    protected static final Logger LOGGER = LogFactory.getLogger(ShellProcessor.class);
    private String uniqueId = null;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ProcessResult process(JobContext context) {
        ProcessResult result2 = new ProcessResult(false);
        this.logCollector.collect(this.uniqueId, "Script process start to run, taskName=" + context.getTaskName());
        try {
            ProcessBuilder processBuilder = ShellUtil.createProcessBuilder(this.getContent(context));
            if (this.redirectStream()) {
                processBuilder.redirectErrorStream(true);
            }
            this.shellProcess = processBuilder.start();
            CountDownLatch countDownLatch = null;
            if (this.redirectStream()) {
                countDownLatch = new CountDownLatch(1);
                ShellStreamProcessor stderrStreamProcessor = new ShellStreamProcessor(this, this.shellProcess.getInputStream(), StreamType.STD_ERR, countDownLatch);
                stderrStreamProcessor.start();
            } else {
                countDownLatch = new CountDownLatch(2);
                ShellStreamProcessor stdoutStreamProcessor = new ShellStreamProcessor(this, this.shellProcess.getInputStream(), StreamType.STD_OUT, countDownLatch);
                stdoutStreamProcessor.start();
                ShellStreamProcessor stderrStreamProcessor = new ShellStreamProcessor(this, this.shellProcess.getErrorStream(), StreamType.STD_ERR, countDownLatch);
                stderrStreamProcessor.start();
            }
            countDownLatch.await();
            if (this.shellProcess.waitFor() == 0) {
                result2.setStatus(true);
            }
        }
        catch (Throwable e) {
            LOGGER.error("", e);
            this.logCollector.collect(this.uniqueId, "script process errors", e);
            result2.setResult(ExceptionUtil.getMessage(e));
        }
        finally {
            this.logCollector.collect(this.uniqueId, "Script process end, run status=" + result2.getStatus().getEnDesc());
            return result2;
        }
    }

    protected String[] getContent(JobContext context) {
        String[] parameters = StringUtils.isNotEmpty(context.getShardingParameter()) ? new String[]{String.valueOf(context.getShardingId()), context.getShardingParameter()} : (StringUtils.isNotEmpty(context.getInstanceParameters()) ? context.getInstanceParameters().trim().split(" ") : context.getJobParameters().trim().split(" "));
        String[] contents = new String[3 + parameters.length];
        contents[0] = "/bin/sh";
        contents[1] = "-c";
        contents[2] = context.getContent();
        for (int i = 0; i < parameters.length; ++i) {
            contents[3 + i] = parameters[i];
        }
        return contents;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processStdOutputStream(InputStream inputStream) {
        String line = null;
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
            while ((line = br.readLine()) != null) {
                this.logCollector.collect(this.uniqueId, line);
            }
            this.logCollector.collect(this.uniqueId, line, StreamType.STD_OUT, true);
        }
        catch (Throwable e) {
            try {
                LOGGER.error("error ShellJobProcessor stdout stream", e);
                this.logCollector.collect(this.uniqueId, "error process stdout stream", e);
                this.logCollector.collect(this.uniqueId, line, StreamType.STD_OUT, true);
            }
            catch (Throwable throwable) {
                this.logCollector.collect(this.uniqueId, line, StreamType.STD_OUT, true);
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processStdErrorStream(InputStream inputStream) {
        String line = null;
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
            while ((line = br.readLine()) != null) {
                this.logCollector.collect(this.uniqueId, line);
            }
            this.logCollector.collect(this.uniqueId, line, StreamType.STD_OUT, true);
        }
        catch (Throwable e) {
            try {
                LOGGER.error("error ShellJobProcessor stderr stream", e);
                this.logCollector.collect(this.uniqueId, "error process stderr stream", e);
                this.logCollector.collect(this.uniqueId, line, StreamType.STD_OUT, true);
            }
            catch (Throwable throwable) {
                this.logCollector.collect(this.uniqueId, line, StreamType.STD_OUT, true);
                throw throwable;
            }
        }
    }

    protected boolean redirectStream() {
        return true;
    }

    @Override
    public ProcessResult postProcess(JobContext context) {
        return null;
    }

    @Override
    public void kill(JobContext context) {
        try {
            long pid = ShellUtil.getPidOfProcess(this.shellProcess);
            if (pid > 0L) {
                ShellUtil.killProcess(pid);
            }
        }
        catch (Throwable e) {
            LOGGER.error("kill shell job jobInstanceId={} failed, {}", context.getJobInstanceId(), e);
        }
        try {
            if (this.shellProcess != null) {
                this.shellProcess.destroy();
            }
        }
        catch (Throwable th) {
            LOGGER.error("kill shell job jobInstanceId={} failed, {}", context.getJobInstanceId(), th);
        }
    }

    @Override
    public void preProcess(JobContext context) throws Exception {
        this.uniqueId = context.getUniqueId();
    }
}

