/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestTaskErrorsUsingLocalMode {
    private static final Logger LOG = LoggerFactory.getLogger(TestTaskErrorsUsingLocalMode.class);
    private static final String VERTEX_NAME = "vertex1";
    private static final File STAGING_DIR = new File(System.getProperty("test.build.data"), TestTaskErrorsUsingLocalMode.class.getName()).getAbsoluteFile();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=20000L)
    public void testFatalErrorReported() throws IOException, TezException, InterruptedException {
        TezClient tezClient = this.getTezClient("testFatalErrorReported");
        DAGClient dagClient = null;
        try {
            FailingProcessor.configureForFatalFail();
            DAG dag = DAG.create((String)"testFatalErrorReportedDag").addVertex(Vertex.create((String)VERTEX_NAME, (ProcessorDescriptor)ProcessorDescriptor.create((String)FailingProcessor.class.getName()), (int)1));
            dagClient = tezClient.submitDAG(dag);
            dagClient.waitForCompletion();
            Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
            Assert.assertEquals((long)1L, (long)dagClient.getVertexStatus(VERTEX_NAME, null).getProgress().getFailedTaskAttemptCount());
        }
        finally {
            if (dagClient != null) {
                dagClient.close();
            }
            tezClient.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=20000L)
    public void testNonFatalErrorReported() throws IOException, TezException, InterruptedException {
        TezClient tezClient = this.getTezClient("testNonFatalErrorReported");
        DAGClient dagClient = null;
        try {
            FailingProcessor.configureForNonFatalFail();
            DAG dag = DAG.create((String)"testNonFatalErrorReported").addVertex(Vertex.create((String)VERTEX_NAME, (ProcessorDescriptor)ProcessorDescriptor.create((String)FailingProcessor.class.getName()), (int)1));
            dagClient = tezClient.submitDAG(dag);
            dagClient.waitForCompletion();
            Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient.getDAGStatus(null).getState());
            Assert.assertEquals((long)4L, (long)dagClient.getVertexStatus(VERTEX_NAME, null).getProgress().getFailedTaskAttemptCount());
        }
        finally {
            if (dagClient != null) {
                dagClient.close();
            }
            tezClient.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=20000L)
    public void testSelfKillReported() throws IOException, TezException, InterruptedException {
        TezClient tezClient = this.getTezClient("testSelfKillReported");
        DAGClient dagClient = null;
        try {
            FailingProcessor.configureForKilled(10);
            DAG dag = DAG.create((String)"testSelfKillReported").addVertex(Vertex.create((String)VERTEX_NAME, (ProcessorDescriptor)ProcessorDescriptor.create((String)FailingProcessor.class.getName()), (int)1));
            dagClient = tezClient.submitDAG(dag);
            dagClient.waitForCompletion();
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
            Assert.assertEquals((long)10L, (long)dagClient.getVertexStatus(VERTEX_NAME, null).getProgress().getKilledTaskAttemptCount());
        }
        finally {
            if (dagClient != null) {
                dagClient.close();
            }
            tezClient.stop();
        }
    }

    private TezClient getTezClient(String name) throws IOException, TezException {
        TezConfiguration tezConf1 = new TezConfiguration();
        tezConf1.setBoolean("tez.local.mode", true);
        tezConf1.set("fs.defaultFS", "file:///");
        tezConf1.set("tez.staging-dir", STAGING_DIR.getAbsolutePath());
        tezConf1.setBoolean("tez.runtime.optimize.local.fetch", true);
        tezConf1.setLong("tez.am.sleep.time.before.exit.millis", 500L);
        TezClient tezClient1 = TezClient.create((String)name, (TezConfiguration)tezConf1, (boolean)true);
        tezClient1.start();
        return tezClient1;
    }

    public static class FailingProcessor
    extends AbstractLogicalIOProcessor {
        private static final String FAIL_STRING_NON_FATAL = "non-fatal-fail";
        private static final String FAIL_STRING_FATAL = "fatal-fail";
        private static final String KILL_STRING = "kill-self";
        private static volatile boolean shouldFail;
        private static volatile boolean fatalError;
        private static volatile boolean shouldKill;
        private static volatile int killModeAttemptNumberToSucceed;

        static void reset() {
            shouldFail = false;
            fatalError = false;
            shouldKill = false;
            killModeAttemptNumberToSucceed = -1;
        }

        static void configureForNonFatalFail() {
            FailingProcessor.reset();
            shouldFail = true;
        }

        static void configureForFatalFail() {
            FailingProcessor.reset();
            shouldFail = true;
            fatalError = true;
        }

        static void configureForKilled(int attemptNumber) {
            FailingProcessor.reset();
            shouldKill = true;
            killModeAttemptNumberToSucceed = attemptNumber;
        }

        public FailingProcessor(ProcessorContext context) {
            super(context);
        }

        public void initialize() throws Exception {
        }

        public void handleEvents(List<Event> processorEvents) {
        }

        public void close() throws Exception {
        }

        public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
            LOG.info("Running Failing processor");
            if (shouldFail) {
                if (fatalError) {
                    LOG.info("Reporting fatal error");
                    this.getContext().reportFailure(TaskFailureType.FATAL, null, FAIL_STRING_FATAL);
                } else {
                    LOG.info("Reporting non-fatal error");
                    this.getContext().reportFailure(TaskFailureType.NON_FATAL, null, FAIL_STRING_NON_FATAL);
                }
            } else if (shouldKill && this.getContext().getTaskAttemptNumber() != killModeAttemptNumberToSucceed) {
                LOG.info("Reporting self-kill for attempt=" + this.getContext().getTaskAttemptNumber());
                this.getContext().killSelf(null, KILL_STRING);
            }
        }

        static {
            FailingProcessor.reset();
        }
    }
}

