/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer;
import org.apache.flink.runtime.checkpoint.channel.RecoveredChannelStateHandler;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;

class ResultSubpartitionRecoveredStateHandler
implements RecoveredChannelStateHandler<ResultSubpartitionInfo, Tuple2<BufferBuilder, BufferConsumer>> {
    private final ResultPartitionWriter[] writers;
    private final boolean notifyAndBlockOnCompletion;

    ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] writers, boolean notifyAndBlockOnCompletion) {
        this.writers = writers;
        this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
    }

    @Override
    public RecoveredChannelStateHandler.BufferWithContext<Tuple2<BufferBuilder, BufferConsumer>> getBuffer(ResultSubpartitionInfo subpartitionInfo) throws IOException, InterruptedException {
        BufferBuilder bufferBuilder = this.getSubpartition(subpartitionInfo).requestBufferBuilderBlocking();
        return new RecoveredChannelStateHandler.BufferWithContext<Tuple2<BufferBuilder, BufferConsumer>>(ChannelStateByteBuffer.wrap(bufferBuilder), Tuple2.of((Object)bufferBuilder, (Object)bufferBuilder.createBufferConsumer()));
    }

    @Override
    public void recover(ResultSubpartitionInfo subpartitionInfo, Tuple2<BufferBuilder, BufferConsumer> bufferBuilderAndConsumer) throws IOException {
        ((BufferBuilder)bufferBuilderAndConsumer.f0).finish();
        if (((BufferConsumer)bufferBuilderAndConsumer.f1).isDataAvailable()) {
            NetworkActionsLogger.traceRecover("ResultSubpartitionRecoveredStateHandler#recover", (BufferConsumer)bufferBuilderAndConsumer.f1, subpartitionInfo);
            boolean added = this.getSubpartition(subpartitionInfo).add((BufferConsumer)bufferBuilderAndConsumer.f1, Integer.MIN_VALUE);
            if (!added) {
                throw new IOException("Buffer consumer couldn't be added to ResultSubpartition");
            }
        } else {
            ((BufferConsumer)bufferBuilderAndConsumer.f1).close();
        }
    }

    private CheckpointedResultSubpartition getSubpartition(ResultSubpartitionInfo subpartitionInfo) {
        ResultPartitionWriter writer = this.writers[subpartitionInfo.getPartitionIdx()];
        if (writer instanceof CheckpointedResultPartition) {
            return ((CheckpointedResultPartition)((Object)writer)).getCheckpointedSubpartition(subpartitionInfo.getSubPartitionIdx());
        }
        throw new IllegalStateException("Cannot restore state to a non-checkpointable partition type: " + writer);
    }

    @Override
    public void close() throws IOException {
        for (ResultPartitionWriter writer : this.writers) {
            if (!(writer instanceof CheckpointedResultPartition)) continue;
            ((CheckpointedResultPartition)((Object)writer)).finishReadRecoveredState(this.notifyAndBlockOnCompletion);
        }
    }
}

