/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.NonClosingCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class StateSnapshotContextSynchronousImpl
implements StateSnapshotContext {
    private final long checkpointId;
    private final long checkpointTimestamp;
    private final CheckpointStreamFactory streamFactory;
    private final KeyGroupRange keyGroupRange;
    private final CloseableRegistry closableRegistry;
    private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
    private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateCheckpointClosingFuture;
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateCheckpointClosingFuture;

    @VisibleForTesting
    public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp) {
        this.checkpointId = checkpointId;
        this.checkpointTimestamp = checkpointTimestamp;
        this.streamFactory = null;
        this.keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        this.closableRegistry = new CloseableRegistry();
    }

    public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp, CheckpointStreamFactory streamFactory, KeyGroupRange keyGroupRange, CloseableRegistry closableRegistry) {
        this.checkpointId = checkpointId;
        this.checkpointTimestamp = checkpointTimestamp;
        this.streamFactory = (CheckpointStreamFactory)Preconditions.checkNotNull((Object)streamFactory);
        this.keyGroupRange = (KeyGroupRange)Preconditions.checkNotNull((Object)keyGroupRange);
        this.closableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closableRegistry);
    }

    @Override
    public long getCheckpointId() {
        return this.checkpointId;
    }

    @Override
    public long getCheckpointTimestamp() {
        return this.checkpointTimestamp;
    }

    private CheckpointStreamFactory.CheckpointStateOutputStream openAndRegisterNewStream() throws Exception {
        CheckpointStreamFactory.CheckpointStateOutputStream cout = this.streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        this.closableRegistry.registerCloseable((Closeable)((Object)cout));
        return cout;
    }

    @Override
    public KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception {
        if (null == this.keyedStateCheckpointOutputStream) {
            Preconditions.checkState((this.keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP_RANGE ? 1 : 0) != 0, (Object)"Not a keyed operator");
            this.keyedStateCheckpointOutputStream = new KeyedStateCheckpointOutputStream(this.openAndRegisterNewStream(), this.keyGroupRange);
        }
        return this.keyedStateCheckpointOutputStream;
    }

    @Override
    public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception {
        if (null == this.operatorStateCheckpointOutputStream) {
            this.operatorStateCheckpointOutputStream = new OperatorStateCheckpointOutputStream(this.openAndRegisterNewStream());
        }
        return this.operatorStateCheckpointOutputStream;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException {
        if (null == this.keyedStateCheckpointClosingFuture) {
            StreamCloserCallable<KeyGroupsStateHandle> callable = new StreamCloserCallable<KeyGroupsStateHandle>(this.closableRegistry, this.keyedStateCheckpointOutputStream);
            AsyncSnapshotCallable.AsyncSnapshotTask asyncSnapshotTask = callable.toAsyncSnapshotFutureTask(this.closableRegistry);
            this.keyedStateCheckpointClosingFuture = StateSnapshotContextSynchronousImpl.castAsKeyedStateHandle(asyncSnapshotTask);
        }
        return this.keyedStateCheckpointClosingFuture;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture() throws IOException {
        if (null == this.operatorStateCheckpointClosingFuture) {
            StreamCloserCallable<OperatorStateHandle> callable = new StreamCloserCallable<OperatorStateHandle>(this.closableRegistry, this.operatorStateCheckpointOutputStream);
            this.operatorStateCheckpointClosingFuture = callable.toAsyncSnapshotFutureTask(this.closableRegistry);
        }
        return this.operatorStateCheckpointClosingFuture;
    }

    private static RunnableFuture<SnapshotResult<KeyedStateHandle>> castAsKeyedStateHandle(RunnableFuture<?> asyncSnapshotTask) {
        return asyncSnapshotTask;
    }

    private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<? extends T> stream) throws IOException {
        Preconditions.checkNotNull(stream);
        CheckpointStreamFactory.CheckpointStateOutputStream delegate2 = stream.getDelegate();
        if (this.closableRegistry.unregisterCloseable((Closeable)((Object)delegate2))) {
            delegate2.close();
        }
    }

    public void closeExceptionally() throws IOException {
        IOException exception = null;
        if (this.keyedStateCheckpointOutputStream != null) {
            try {
                this.closeAndUnregisterStream(this.keyedStateCheckpointOutputStream);
            }
            catch (IOException e) {
                exception = new IOException("Could not close the raw keyed state checkpoint output stream.", e);
            }
        }
        if (this.operatorStateCheckpointOutputStream != null) {
            try {
                this.closeAndUnregisterStream(this.operatorStateCheckpointOutputStream);
            }
            catch (IOException e) {
                exception = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)new IOException("Could not close the raw operator state checkpoint output stream.", e), (Throwable)exception);
            }
        }
        if (this.keyedStateCheckpointClosingFuture != null) {
            this.keyedStateCheckpointClosingFuture.cancel(true);
        }
        if (this.operatorStateCheckpointClosingFuture != null) {
            this.operatorStateCheckpointClosingFuture.cancel(true);
        }
        if (exception != null) {
            throw exception;
        }
    }

    private static final class StreamCloserCallable<T extends StreamStateHandle>
    extends AsyncSnapshotCallable<SnapshotResult<T>> {
        @Nullable
        private final NonClosingCheckpointOutputStream<T> stream;
        private final CloseableRegistry closableRegistry;

        StreamCloserCallable(CloseableRegistry closableRegistry, @Nullable NonClosingCheckpointOutputStream<T> stream) {
            this.closableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closableRegistry);
            this.stream = stream;
        }

        @Override
        protected SnapshotResult<T> callInternal() throws Exception {
            if (this.stream == null) {
                return SnapshotResult.of(null);
            }
            if (!this.closableRegistry.unregisterCloseable((Closeable)((Object)this.stream.getDelegate()))) {
                throw new IOException("Stream delegate appears to be closed, because it is no longer registered.");
            }
            T closed = this.stream.closeAndGetHandle();
            return SnapshotResult.of(closed);
        }

        @Override
        protected void cleanupProvidedResources() {
            try {
                if (this.stream != null && this.closableRegistry.unregisterCloseable((Closeable)((Object)this.stream.getDelegate()))) {
                    this.stream.closeAndGetHandle();
                }
            }
            catch (IOException e) {
                throw new IllegalStateException("Unable to cleanup a stream.", e);
            }
        }
    }
}

