/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.InputDependencyConstraintChecker;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyUtils;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class LazyFromSourcesSchedulingStrategy
implements SchedulingStrategy {
    private static final Predicate<SchedulingExecutionVertex> IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> ExecutionState.CREATED == schedulingExecutionVertex.getState();
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Map<ExecutionVertexID, DeploymentOption> deploymentOptions;
    private final InputDependencyConstraintChecker inputConstraintChecker;

    public LazyFromSourcesSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations)Preconditions.checkNotNull((Object)schedulerOperations);
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull((Object)schedulingTopology);
        this.deploymentOptions = new HashMap<ExecutionVertexID, DeploymentOption>();
        this.inputConstraintChecker = new InputDependencyConstraintChecker();
    }

    @Override
    public void startScheduling() {
        DeploymentOption updateOption = new DeploymentOption(true);
        DeploymentOption nonUpdateOption = new DeploymentOption(false);
        for (SchedulingExecutionVertex schedulingVertex : this.schedulingTopology.getVertices()) {
            DeploymentOption option = nonUpdateOption;
            for (SchedulingResultPartition srp : schedulingVertex.getProducedResults()) {
                if (srp.getResultType().isPipelined()) {
                    option = updateOption;
                }
                this.inputConstraintChecker.addSchedulingResultPartition(srp);
            }
            this.deploymentOptions.put((ExecutionVertexID)schedulingVertex.getId(), option);
        }
        this.allocateSlotsAndDeployExecutionVertices(this.schedulingTopology.getVertices());
    }

    @Override
    public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
        verticesToRestart.stream().map(this.schedulingTopology::getVertex).flatMap(vertex -> IterableUtils.toStream(vertex.getProducedResults())).forEach(this.inputConstraintChecker::resetSchedulingResultPartition);
        this.allocateSlotsAndDeployExecutionVertices(SchedulingStrategyUtils.getVerticesFromIds(this.schedulingTopology, verticesToRestart));
    }

    @Override
    public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
        if (!ExecutionState.FINISHED.equals((Object)executionState)) {
            return;
        }
        Set verticesToSchedule = IterableUtils.toStream(this.schedulingTopology.getVertex(executionVertexId).getProducedResults()).filter(partition -> partition.getResultType().isBlocking()).flatMap(partition -> this.inputConstraintChecker.markSchedulingResultPartitionFinished((SchedulingResultPartition)partition).stream()).flatMap(partition -> IterableUtils.toStream(partition.getConsumers())).collect(Collectors.toSet());
        this.allocateSlotsAndDeployExecutionVertices(verticesToSchedule);
    }

    @Override
    public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {
        SchedulingResultPartition resultPartition = this.schedulingTopology.getResultPartition(resultPartitionId);
        if (!resultPartition.getResultType().isPipelined()) {
            return;
        }
        this.allocateSlotsAndDeployExecutionVertices(resultPartition.getConsumers());
    }

    private void allocateSlotsAndDeployExecutionVertices(Iterable<? extends SchedulingExecutionVertex> vertices) {
        HashSet vertexSet = Sets.newHashSet(vertices);
        IterableUtils.toStream(this.schedulingTopology.getVertices()).filter(v -> vertexSet.contains(v)).filter(IS_IN_CREATED_EXECUTION_STATE.and(this.isInputConstraintSatisfied())).map(v -> new ExecutionVertexDeploymentOption((ExecutionVertexID)v.getId(), this.deploymentOptions.get(v.getId()))).forEach(d -> this.schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(d)));
    }

    private Predicate<SchedulingExecutionVertex> isInputConstraintSatisfied() {
        return this.inputConstraintChecker::check;
    }

    public static class Factory
    implements SchedulingStrategyFactory {
        @Override
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new LazyFromSourcesSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }
}

