/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.event;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ThreadLocalRandom;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.codec.Encoder;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.function.Function4;
import reactor.util.context.ContextView;

public class InternalEventBus
implements EventBus {
    private static final Logger log = LoggerFactory.getLogger(InternalEventBus.class);
    protected final Topic<SubscriptionInfo> subscriptionTable = Topic.createRoot();
    private static final FastThreadLocal<Set<SubscriptionInfo>> PUB_HANDLERS = new FastThreadLocal<Set<SubscriptionInfo>>(){

        protected Set<SubscriptionInfo> initialValue() {
            return new HashSet<SubscriptionInfo>();
        }
    };
    private static final FastThreadLocal<Set<Object>> DISTINCT_HANDLERS = new FastThreadLocal<Set<Object>>(){

        protected Set<Object> initialValue() {
            return new HashSet<Object>();
        }
    };
    private static final FastThreadLocal<Map<String, List<SubscriptionInfo>>> SHARED = new FastThreadLocal<Map<String, List<SubscriptionInfo>>>(){

        protected Map<String, List<SubscriptionInfo>> initialValue() {
            return new HashMap<String, List<SubscriptionInfo>>();
        }
    };

    public Flux<TopicPayload> subscribe(Subscription subscription) {
        return Flux.create(sink -> sink.onDispose(this.subscribe(subscription, (TopicPayload e) -> {
            sink.next(e);
            return Mono.empty();
        })));
    }

    public <T> Flux<T> subscribe(Subscription subscription, Decoder<T> decoder) {
        return this.subscribe(subscription).mapNotNull(payload -> {
            try {
                Object object = payload.decode(decoder, false);
                return object;
            }
            catch (Throwable e) {
                log.error("decode message [{}] error", (Object)payload.getTopic(), (Object)e);
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)payload);
            }
            return null;
        });
    }

    public <T> Flux<T> subscribe(Subscription subscription, Class<T> type) {
        return this.subscribe(subscription).mapNotNull(payload -> {
            try {
                return payload.decode(type);
            }
            catch (Throwable e) {
                log.error("decode message [{}] error", (Object)payload.getTopic(), (Object)e);
                return null;
            }
        });
    }

    public Disposable subscribe(Subscription subscription, Function<TopicPayload, Mono<Void>> handler) {
        Disposable.Composite disposable = Disposables.composite();
        LocalHandler lazyHandler = new LocalHandler(handler);
        for (String topic : subscription.getTopics()) {
            SubscriptionInfo subscriptionInfo = SubscriptionInfo.of(subscription, topic, lazyHandler, false);
            log.debug("subscribe: {}", (Object)subscriptionInfo);
            Topic tTopic = this.subscriptionTable.append(topic);
            tTopic.subscribe((Object[])new SubscriptionInfo[]{subscriptionInfo});
            disposable.add(() -> {
                log.debug("unsubscribe: {}", (Object)subscriptionInfo);
                tTopic.unsubscribe((Object[])new SubscriptionInfo[]{subscriptionInfo});
                subscriptionInfo.dispose();
            });
            if (!subscriptionInfo.hasFeature(Subscription.Feature.broker)) continue;
            disposable.add(this.subscribeToCluster(subscriptionInfo));
        }
        return disposable;
    }

    protected Disposable subscribeToCluster(SubscriptionInfo info) {
        return Disposables.disposed();
    }

    public <T> Mono<Long> publish(String topic, Publisher<T> event) {
        return this.doPublish(topic, event, (t, e, p, f) -> Flux.from((Publisher)e).flatMap(val -> this.publishFromLocal((String)t, (Object)val, (List<SubscriptionInfo>)p, (ContextView)f)).then(), sub -> sub.isCluster() || sub.hasFeature(Subscription.Feature.local));
    }

    public <T> Mono<Long> publish(String topic, T event, Scheduler scheduler) {
        return this.publish(topic, event).subscribeOn(scheduler);
    }

    public <T> Mono<Long> publish(String topic, Encoder<T> encoder, T event) {
        return this.publish(topic, event);
    }

    public <T> Mono<Long> publish(String topic, T event) {
        return this.doPublish(topic, event, this::publishFromLocal, sub -> sub.isCluster() || sub.hasFeature(Subscription.Feature.local));
    }

    protected Mono<Void> doPublish0(String topic, TopicPayload payload, List<SubscriptionInfo> subs, ContextView ctx) {
        int subSize = subs.size();
        TreeMap<Integer, ArrayList> priority = subSize == 1 ? null : new TreeMap<Integer, ArrayList>();
        Mono task = null;
        for (SubscriptionInfo sub : subs) {
            log.trace("publish {} to {}", (Object)topic, (Object)sub);
            Function handler = sub.handler;
            task = (Mono)handler.apply(payload);
            if (subSize <= 1) continue;
            priority.computeIfAbsent(sub.priority, ignore -> new ArrayList(subSize)).add(task);
        }
        if (task == null) {
            return Mono.empty();
        }
        if (subSize == 1) {
            return task;
        }
        if (priority.size() == 1) {
            return Flux.merge((Iterable)((Iterable)priority.get(subs.get(0).priority))).then();
        }
        return Flux.fromIterable(priority.values()).concatMap(Flux::merge).then();
    }

    private <T> Mono<Void> publishFromLocal(String topic, T value, List<SubscriptionInfo> subs, ContextView ctx) {
        TopicPayload payload = TopicPayload.of((String)topic, (Payload)Payload.of(value, null));
        TraceHolder.writeContextTo((ContextView)ctx, (Object)payload, TopicPayload::addHeader);
        return this.doPublish0(topic, payload, subs, ctx);
    }

    public <T> Mono<Long> publish(String topic, Encoder<T> encoder, Publisher<? extends T> eventStream) {
        return this.publish(topic, (T)eventStream);
    }

    public <T> Mono<Long> publish(String topic, Encoder<T> encoder, Publisher<? extends T> eventStream, Scheduler scheduler) {
        return this.publish(topic, (T)eventStream);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> Mono<Long> doPublish(String topic, T arg, Function4<String, T, List<SubscriptionInfo>, ContextView, Mono<Void>> handler, Predicate<SubscriptionInfo> predicate) {
        Mono task;
        Map sharedMap = (Map)SHARED.get();
        Set distinct = (Set)DISTINCT_HANDLERS.get();
        Set readyToPub = (Set)PUB_HANDLERS.get();
        try {
            this.subscriptionTable.findTopic(topic, predicate, (Object)sharedMap, (Object)distinct, (Object)readyToPub, (_predicate, _sharedMap, _distinct, _readyToPub, subs) -> {
                Set subscriptions = subs.getSubscribers();
                if (subscriptions.isEmpty()) {
                    return;
                }
                for (SubscriptionInfo sub : subscriptions) {
                    if (!_predicate.test(sub) || !_distinct.add(sub.handler)) continue;
                    if (sub.hasFeature(Subscription.Feature.shared)) {
                        _sharedMap.computeIfAbsent(sub.subscriber, ignore -> new ArrayList(8)).add(sub);
                        continue;
                    }
                    _readyToPub.add(sub);
                }
            }, (_predicate, _sharedMap, _distinct, _readyToPub) -> {
                if (_sharedMap.isEmpty()) {
                    return;
                }
                block0: for (List value : _sharedMap.values()) {
                    int size = value.size();
                    if (size == 0) continue;
                    SubscriptionInfo first = (SubscriptionInfo)value.get(0);
                    if (size == 1) {
                        _readyToPub.add(first);
                        continue;
                    }
                    if (first.hasFeature(Subscription.Feature.sharedLocalFirst)) {
                        for (SubscriptionInfo subscriptionInfo : value) {
                            if (subscriptionInfo.isCluster()) continue;
                            _readyToPub.add(subscriptionInfo);
                            continue block0;
                        }
                    }
                    if (first.hasFeature(Subscription.Feature.sharedOldest)) {
                        value.sort(SubscriptionInfo.comparatorByTime);
                        _readyToPub.add(value.get(0));
                        continue;
                    }
                    _readyToPub.add(value.get(ThreadLocalRandom.current().nextInt(0, size)));
                }
            });
            if (readyToPub.isEmpty()) {
                Mono mono = Reactors.ALWAYS_ZERO_LONG;
                return mono;
            }
            ArrayList pub = new ArrayList(readyToPub);
            task = Mono.deferContextual(ctx -> (Mono)handler.apply((Object)topic, arg, (Object)pub, ctx)).thenReturn((Object)pub.size());
        }
        finally {
            sharedMap.clear();
            distinct.clear();
            readyToPub.clear();
        }
        return task;
    }

    public static class SubscriptionInfo
    implements Disposable,
    Externalizable {
        private transient int $hashCodeCache;
        static Comparator<SubscriptionInfo> comparatorByTime = Comparator.comparingLong(SubscriptionInfo::getTime);
        static Comparator<SubscriptionInfo> comparatorPriority = Comparator.comparingLong(SubscriptionInfo::getPriority);
        private long time;
        private String subscriber;
        private String topic;
        private boolean cluster;
        private String clusterServerId;
        private long features;
        private transient Function<TopicPayload, Mono<Void>> handler;
        private int priority;

        public static SubscriptionInfo of(Subscription subscription, String topic, Function<TopicPayload, Mono<Void>> handler, boolean cluster) {
            SubscriptionInfo info = new SubscriptionInfo();
            info.time = System.currentTimeMillis();
            info.topic = topic;
            info.subscriber = subscription.getSubscriber();
            info.handler = handler;
            info.features = EnumDict.toMask((EnumDict[])subscription.getFeatures());
            info.cluster = cluster;
            info.priority = subscription.getPriority();
            return info;
        }

        public void dispose() {
        }

        @JsonIgnore
        public boolean isDisposed() {
            return super.isDisposed();
        }

        boolean hasFeature(Subscription.Feature feature) {
            return feature.in(this.features);
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeLong(this.time);
            out.writeUTF(this.subscriber);
            out.writeUTF(this.topic);
            out.writeLong(this.features);
            out.writeBoolean(this.cluster);
            SerializeUtils.writeObject((Object)this.clusterServerId, (ObjectOutput)out);
            out.writeInt(this.priority);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.time = in.readLong();
            this.subscriber = in.readUTF();
            this.topic = in.readUTF();
            this.features = in.readLong();
            this.cluster = in.readBoolean();
            this.clusterServerId = (String)SerializeUtils.readObject((ObjectInput)in);
            this.priority = in.readInt();
        }

        public String toString() {
            return this.subscriber + "@" + (this.clusterServerId == null ? "local" : this.clusterServerId) + "::" + this.topic;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SubscriptionInfo)) {
                return false;
            }
            SubscriptionInfo other = (SubscriptionInfo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getTime() != other.getTime()) {
                return false;
            }
            if (this.isCluster() != other.isCluster()) {
                return false;
            }
            String this$subscriber = this.getSubscriber();
            String other$subscriber = other.getSubscriber();
            if (this$subscriber == null ? other$subscriber != null : !this$subscriber.equals(other$subscriber)) {
                return false;
            }
            String this$topic = this.getTopic();
            String other$topic = other.getTopic();
            if (this$topic == null ? other$topic != null : !this$topic.equals(other$topic)) {
                return false;
            }
            String this$clusterServerId = this.getClusterServerId();
            String other$clusterServerId = other.getClusterServerId();
            return !(this$clusterServerId == null ? other$clusterServerId != null : !this$clusterServerId.equals(other$clusterServerId));
        }

        protected boolean canEqual(Object other) {
            return other instanceof SubscriptionInfo;
        }

        public int hashCode() {
            if (this.$hashCodeCache != 0) {
                return this.$hashCodeCache;
            }
            int PRIME = 59;
            int result = 1;
            long $time = this.getTime();
            result = result * 59 + (int)($time >>> 32 ^ $time);
            result = result * 59 + (this.isCluster() ? 79 : 97);
            String $subscriber = this.getSubscriber();
            result = result * 59 + ($subscriber == null ? 43 : $subscriber.hashCode());
            String $topic = this.getTopic();
            result = result * 59 + ($topic == null ? 43 : $topic.hashCode());
            String $clusterServerId = this.getClusterServerId();
            if ((result = result * 59 + ($clusterServerId == null ? 43 : $clusterServerId.hashCode())) == 0) {
                result = Integer.MIN_VALUE;
            }
            this.$hashCodeCache = result;
            return result;
        }

        public long getTime() {
            return this.time;
        }

        public String getSubscriber() {
            return this.subscriber;
        }

        public String getTopic() {
            return this.topic;
        }

        public boolean isCluster() {
            return this.cluster;
        }

        public String getClusterServerId() {
            return this.clusterServerId;
        }

        public long getFeatures() {
            return this.features;
        }

        public Function<TopicPayload, Mono<Void>> getHandler() {
            return this.handler;
        }

        public int getPriority() {
            return this.priority;
        }

        public SubscriptionInfo(long time, String subscriber, String topic, boolean cluster, String clusterServerId, long features, Function<TopicPayload, Mono<Void>> handler, int priority) {
            this.time = time;
            this.subscriber = subscriber;
            this.topic = topic;
            this.cluster = cluster;
            this.clusterServerId = clusterServerId;
            this.features = features;
            this.handler = handler;
            this.priority = priority;
        }

        public SubscriptionInfo() {
        }
    }

    private static class LocalHandler
    implements Function<TopicPayload, Mono<Void>> {
        private final int hashCode = Objects.hashCode(this);
        private final Function<TopicPayload, Mono<Void>> handler;

        private LocalHandler(Function<TopicPayload, Mono<Void>> handler) {
            this.handler = handler;
        }

        public Mono<Void> apply0(TopicPayload payload) {
            String topic = payload.getTopic();
            try {
                return this.handler.apply(payload).onErrorResume(err -> {
                    log.warn("handle publish [{}] error", (Object)topic, err);
                    return Mono.empty();
                });
            }
            catch (Throwable err2) {
                log.warn("handle publish [{}] error", (Object)topic, (Object)err2);
                return Mono.empty();
            }
        }

        @Override
        public Mono<Void> apply(TopicPayload payload) {
            return Mono.defer(() -> this.apply0(payload));
        }

        public int hashCode() {
            return this.hashCode;
        }
    }
}

