/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.utils;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

public class DistinctDurationFlux<T>
extends FluxOperator<T, T> {
    private final Function<T, ?> keySelector;
    private final Duration duration;

    protected DistinctDurationFlux(Flux<? extends T> source, Function<T, ?> keySelector, Duration duration) {
        super(source);
        this.keySelector = keySelector;
        this.duration = duration;
    }

    public static <T> Flux<T> create(Flux<? extends T> source, Function<T, ?> keySelector, Duration duration) {
        return new DistinctDurationFlux<T>(source, keySelector, duration);
    }

    public void subscribe(@Nonnull CoreSubscriber<? super T> actual) {
        this.source.subscribe(new DistinctDurationSubscriber<T>(actual, this.keySelector, this.duration.toMillis()));
    }

    static class DistinctDurationSubscriber<T>
    extends ConcurrentHashMap<Object, Long>
    implements CoreSubscriber<T>,
    Subscription,
    Runnable,
    Scannable {
        private final CoreSubscriber<? super T> actual;
        private final Function<T, ?> keySelector;
        private final long expires;
        private final Disposable disposable;
        private volatile Subscription subscription;
        static final AtomicReferenceFieldUpdater<DistinctDurationSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(DistinctDurationSubscriber.class, Subscription.class, "subscription");

        DistinctDurationSubscriber(CoreSubscriber<? super T> actual, Function<T, ?> keySelector, long expires) {
            this.actual = actual;
            this.keySelector = keySelector;
            this.expires = expires;
            int interval = (int)((double)expires * 1.1);
            this.disposable = Schedulers.parallel().schedulePeriodically((Runnable)this, (long)interval, (long)interval, TimeUnit.MILLISECONDS);
        }

        @Nonnull
        public Context currentContext() {
            return this.actual.currentContext();
        }

        @Override
        public void run() {
            this.cleanup();
        }

        private void cleanup() {
            long now = System.currentTimeMillis();
            this.forEach((key, last) -> {
                if (now - last > this.expires) {
                    this.remove(key);
                }
            });
        }

        public void request(long n) {
            Subscription s;
            if (Operators.validate((long)n) && (s = this.subscription) != null) {
                s.request(n);
            }
        }

        public void cancel() {
            if (Operators.terminate(S, (Object)this)) {
                this.complete();
            }
        }

        public void onSubscribe(@Nonnull Subscription s) {
            if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
                this.actual.onSubscribe((Subscription)this);
            }
        }

        public void onNext(T t) {
            try {
                long now = System.currentTimeMillis();
                Object key = this.keySelector.apply(t);
                if (key == null) {
                    this.actual.onNext(t);
                    return;
                }
                Long last = this.putIfAbsent(key, now);
                if (last == null || now - last > this.expires) {
                    this.actual.onNext(t);
                } else {
                    Operators.onDiscard(t, (Context)this.actual.currentContext());
                    this.request(1L);
                }
            }
            catch (Throwable e) {
                this.onError(e);
            }
        }

        public void onError(Throwable t) {
            try {
                if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) {
                    Operators.onErrorDropped((Throwable)t, (Context)this.currentContext());
                    return;
                }
                this.actual.onError(t);
            }
            catch (Throwable e) {
                Operators.onErrorDropped((Throwable)Exceptions.addSuppressed((Throwable)t, (Throwable)e), (Context)this.currentContext());
            }
            finally {
                this.complete();
            }
        }

        public void onComplete() {
            if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators.cancelledSubscription()) {
                try {
                    this.actual.onComplete();
                }
                catch (Throwable e) {
                    Operators.onErrorDropped((Throwable)e, (Context)this.currentContext());
                }
                finally {
                    this.complete();
                }
            }
        }

        private void complete() {
            this.clear();
            this.disposable.dispose();
        }

        public Object scanUnsafe(@Nonnull Scannable.Attr key) {
            if (key == Scannable.Attr.RUN_STYLE) {
                return Scannable.Attr.RunStyle.SYNC;
            }
            if (key == Scannable.Attr.ACTUAL) {
                return this.actual;
            }
            return null;
        }
    }
}

