/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.message.codec.context;

import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.jetlinks.core.message.codec.context.SerialContext;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

class QueueSerialContext<IN, OUT>
implements SerialContext<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(QueueSerialContext.class);
    int maxSize = 256;
    Sinks.Many<IN> inputSinksMany = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    Queue<Tuple2<IN, Consumer<OUT>>> inputQueue = new ConcurrentLinkedQueue<Tuple2<IN, Consumer<OUT>>>();
    AtomicReference<Consumer<OUT>> output = new AtomicReference();
    AtomicBoolean awaiting = new AtomicBoolean();

    QueueSerialContext() {
    }

    @Override
    public Mono<OUT> inputAndAwait(IN in, Duration timeout) {
        return Mono.create(sink -> {
            if (this.inputQueue.size() >= this.maxSize) {
                sink.error((Throwable)new UnsupportedOperationException("out of serial queue"));
                this.drain();
                return;
            }
            Tuple2 tp2 = Tuples.of((Object)in, arg_0 -> ((MonoSink)sink).success(arg_0));
            this.inputQueue.add(tp2);
            this.drain();
            sink.onDispose(() -> {
                this.inputQueue.remove(tp2);
                this.drain();
            });
        }).timeout(timeout, Mono.error(TimeoutException::new));
    }

    private void drain() {
        Tuple2<IN, Consumer<OUT>> input;
        if (this.awaiting.compareAndSet(false, true) && (input = this.inputQueue.poll()) != null) {
            this.output.set((Consumer<OUT>)input.getT2());
            this.inputSinksMany.emitNext(input.getT1(), Reactors.emitFailureHandler());
        }
    }

    @Override
    public void output(OUT out) {
        Consumer consumer = this.output.getAndSet(null);
        if (consumer != null) {
            consumer.accept(out);
            this.awaiting.set(false);
        }
        this.drain();
    }

    @Override
    public Flux<IN> listen() {
        return this.inputSinksMany.asFlux();
    }
}

