/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
import org.apache.flink.runtime.state.BackendWritableBroadcastState;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.HeapBroadcastState;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PartitionableListState;
import org.apache.flink.runtime.state.RegisteredBroadcastStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DefaultOperatorStateBackend
implements OperatorStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final CloseableRegistry closeStreamOnCancelRegistry;
    private final JavaSerializer<Serializable> deprecatedDefaultJavaSerializer = new JavaSerializer();
    private final ExecutionConfig executionConfig;
    private final Map<String, PartitionableListState<?>> accessedStatesByName;
    private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;
    private final AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy;

    public DefaultOperatorStateBackend(ExecutionConfig executionConfig, CloseableRegistry closeStreamOnCancelRegistry, Map<String, PartitionableListState<?>> registeredOperatorStates, Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates, Map<String, PartitionableListState<?>> accessedStatesByName, Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName, AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy) {
        this.closeStreamOnCancelRegistry = closeStreamOnCancelRegistry;
        this.executionConfig = executionConfig;
        this.registeredOperatorStates = registeredOperatorStates;
        this.registeredBroadcastStates = registeredBroadcastStates;
        this.accessedStatesByName = accessedStatesByName;
        this.accessedBroadcastStatesByName = accessedBroadcastStatesByName;
        this.snapshotStrategy = snapshotStrategy;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public Set<String> getRegisteredStateNames() {
        return this.registeredOperatorStates.keySet();
    }

    public Set<String> getRegisteredBroadcastStateNames() {
        return this.registeredBroadcastStates.keySet();
    }

    @Override
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly((Closeable)this.closeStreamOnCancelRegistry);
        this.registeredOperatorStates.clear();
        this.registeredBroadcastStates.clear();
    }

    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = (String)Preconditions.checkNotNull((Object)stateDescriptor.getName());
        BackendWritableBroadcastState<?, ?> previous = this.accessedBroadcastStatesByName.get(name);
        if (previous != null) {
            DefaultOperatorStateBackend.checkStateNameAndMode(previous.getStateMetaInfo().getName(), name, previous.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            return previous;
        }
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        TypeSerializer broadcastStateKeySerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getKeySerializer());
        TypeSerializer broadcastStateValueSerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getValueSerializer());
        BackendWritableBroadcastState<?, ?> broadcastState = this.registeredBroadcastStates.get(name);
        if (broadcastState == null) {
            broadcastState = new HeapBroadcastState(new RegisteredBroadcastStateBackendMetaInfo(name, OperatorStateHandle.Mode.BROADCAST, broadcastStateKeySerializer, broadcastStateValueSerializer));
            this.registeredBroadcastStates.put(name, broadcastState);
        } else {
            DefaultOperatorStateBackend.checkStateNameAndMode(broadcastState.getStateMetaInfo().getName(), name, broadcastState.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
            TypeSerializerSchemaCompatibility<?> keyCompatibility = restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
            if (keyCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new key typeSerializer for broadcast state must not be incompatible.");
            }
            TypeSerializerSchemaCompatibility<?> valueCompatibility = restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
            if (valueCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new value typeSerializer for broadcast state must not be incompatible.");
            }
            broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
        }
        this.accessedBroadcastStatesByName.put(name, broadcastState);
        return broadcastState;
    }

    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    @Deprecated
    public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor);
    }

    @Deprecated
    public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
        return this.getListState(new ListStateDescriptor(stateName, this.deprecatedDefaultJavaSerializer));
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        long syncStartTime = System.currentTimeMillis();
        RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner = this.snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
        this.snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
        return snapshotRunner;
    }

    private <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = (String)Preconditions.checkNotNull((Object)stateDescriptor.getName());
        PartitionableListState<?> previous = this.accessedStatesByName.get(name);
        if (previous != null) {
            DefaultOperatorStateBackend.checkStateNameAndMode(previous.getStateMetaInfo().getName(), name, previous.getStateMetaInfo().getAssignmentMode(), mode);
            return previous;
        }
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        TypeSerializer partitionStateSerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getElementSerializer());
        PartitionableListState<Object> partitionableListState = this.registeredOperatorStates.get(name);
        if (null == partitionableListState) {
            partitionableListState = new PartitionableListState(new RegisteredOperatorStateBackendMetaInfo(name, partitionStateSerializer, mode));
            this.registeredOperatorStates.put(name, partitionableListState);
        } else {
            DefaultOperatorStateBackend.checkStateNameAndMode(partitionableListState.getStateMetaInfo().getName(), name, partitionableListState.getStateMetaInfo().getAssignmentMode(), mode);
            RegisteredOperatorStateBackendMetaInfo<Object> restoredPartitionableListStateMetaInfo = partitionableListState.getStateMetaInfo();
            TypeSerializer newPartitionStateSerializer = partitionStateSerializer.duplicate();
            TypeSerializerSchemaCompatibility<Object> stateCompatibility = restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer((TypeSerializer<Object>)newPartitionStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new state typeSerializer for operator state must not be incompatible.");
            }
            partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
        }
        this.accessedStatesByName.put(name, partitionableListState);
        return partitionableListState;
    }

    private static void checkStateNameAndMode(String actualName, String expectedName, OperatorStateHandle.Mode actualMode, OperatorStateHandle.Mode expectedMode) {
        Preconditions.checkState((boolean)actualName.equals(expectedName), (Object)("Incompatible state names. Was [" + actualName + "], registered with [" + expectedName + "]."));
        Preconditions.checkState((boolean)actualMode.equals((Object)expectedMode), (Object)("Incompatible state assignment modes. Was [" + (Object)((Object)actualMode) + "], registered with [" + (Object)((Object)expectedMode) + "]."));
    }
}

