/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.scalecube.event;

import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.scalecube.services.annotations.Service;
import io.scalecube.services.annotations.ServiceMethod;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.jetlinks.core.NativePayload;
import org.jetlinks.core.Payload;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.EventConnection;
import org.jetlinks.supports.event.EventConsumer;
import org.jetlinks.supports.event.EventProducer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ClusterEventBusBroker
implements EventBroker,
Disposable {
    private static final Logger log = LoggerFactory.getLogger(ClusterEventBusBroker.class);
    private final RpcManager rpcManager;
    private final Map<String, RpcEventConnection> connections = new ConcurrentHashMap<String, RpcEventConnection>();
    private final Sinks.Many<EventConnection> acceptSink = Reactors.createMany();

    public ClusterEventBusBroker(RpcManager rpcManager) {
        this.rpcManager = rpcManager;
        rpcManager.listen(Api.class).subscribe(this::handleServiceEvent);
        rpcManager.getServices(Api.class).subscribe(this::handleService);
        rpcManager.registerService((Object)new ApiImpl());
    }

    private void handleServiceEvent(ServiceEvent event) {
        if (event.getType() == ServiceEvent.Type.removed) {
            this.disposeConnection(this.connections.remove(event.getServerNodeId()));
        } else {
            this.rpcManager.getService(event.getServerNodeId(), Api.class).subscribe(api -> this.handleService(event, (Api)api));
        }
    }

    protected void handleService(ServiceEvent event, Api api) {
        try {
            RpcEventConnection conn = new RpcEventConnection(event.getServerNodeId(), api);
            RpcEventConnection old = this.connections.put(event.getServerNodeId(), conn);
            if (old != null) {
                this.disposeConnection(old);
            } else if (this.acceptSink.currentSubscriberCount() > 0) {
                this.acceptSink.emitNext((Object)conn, Reactors.emitFailureHandler());
            }
        }
        catch (Throwable err) {
            log.warn("register service error {}", (Object)event.getServiceId(), (Object)err);
        }
    }

    protected void handleService(RpcService<Api> service) {
        try {
            RpcEventConnection conn = new RpcEventConnection(service.serverNodeId(), (Api)service.service());
            RpcEventConnection old = this.connections.put(service.serverNodeId(), conn);
            if (old != null) {
                this.disposeConnection(old);
            } else if (this.acceptSink.currentSubscriberCount() > 0) {
                this.acceptSink.emitNext((Object)conn, Reactors.emitFailureHandler());
            }
        }
        catch (Throwable err) {
            log.warn("register service error {}", (Object)service.serverNodeId(), (Object)err);
        }
    }

    private void disposeConnection(RpcEventConnection connection) {
        if (null != connection) {
            connection.dispose();
        }
    }

    @Override
    public String getId() {
        return "rpc-cluster-broker";
    }

    @Override
    public Flux<EventConnection> accept() {
        return Flux.concat((Publisher[])new Publisher[]{Flux.fromIterable(this.connections.values()), this.acceptSink.asFlux()});
    }

    public void dispose() {
        this.connections.values().forEach(this::disposeConnection);
    }

    protected ObjectOutput createOutput(ByteBuf buf) {
        return new ObjectOutputStream((OutputStream)new ByteBufOutputStream(buf));
    }

    protected ObjectInput createInput(ByteBuf buf) {
        return new ObjectInputStream((InputStream)new ByteBufInputStream(buf, true));
    }

    @Service
    public static interface Api {
        @ServiceMethod
        public Mono<Void> sub(ByteBuf var1);

        @ServiceMethod
        public Mono<Void> unsub(ByteBuf var1);

        @ServiceMethod
        public Mono<Void> pub(ByteBuf var1);
    }

    public class ApiImpl
    implements Api {
        private void handleSubs(ByteBuf buf, BiConsumer<RpcEventConnection, Subscription> consumer) {
            try (ObjectInput input = ClusterEventBusBroker.this.createInput(buf);){
                String serviceId = input.readUTF();
                RpcEventConnection connection = (RpcEventConnection)ClusterEventBusBroker.this.connections.get(serviceId);
                if (null != connection) {
                    Subscription subscription = new Subscription();
                    subscription.readExternal(input);
                    consumer.accept(connection, subscription);
                }
            }
            catch (Throwable error) {
                log.error("Error handling subscription", error);
            }
        }

        @Override
        public Mono<Void> sub(ByteBuf sub) {
            this.handleSubs(sub, (connection, subscription) -> ((RpcEventConnection)connection).subscriptions.emitNext(subscription, Reactors.emitFailureHandler()));
            return Mono.empty();
        }

        @Override
        public Mono<Void> unsub(ByteBuf sub) {
            this.handleSubs(sub, (connection, subscription) -> ((RpcEventConnection)connection).subscriptions.emitNext(subscription, Reactors.emitFailureHandler()));
            return Mono.empty();
        }

        @Override
        public Mono<Void> pub(ByteBuf buf) {
            try (ObjectInput input = ClusterEventBusBroker.this.createInput(buf);){
                String serviceId = input.readUTF();
                RpcEventConnection connection = (RpcEventConnection)ClusterEventBusBroker.this.connections.get(serviceId);
                if (null != connection) {
                    String topic = input.readUTF();
                    Map headers = SerializeUtils.readMap((ObjectInput)input, Maps::newLinkedHashMapWithExpectedSize);
                    Object payload = SerializeUtils.readObject((ObjectInput)input);
                    TopicPayload topicPayload = payload instanceof ByteBuf ? TopicPayload.of((String)topic, (Payload)Payload.of((ByteBuf)((ByteBuf)payload))) : TopicPayload.of((String)topic, (Payload)NativePayload.of((Object)payload));
                    connection.producer.emitNext((Object)topicPayload, Reactors.emitFailureHandler());
                }
            }
            catch (Throwable error) {
                log.error("Error handling subscription", error);
            }
            return Mono.empty();
        }
    }

    private class RpcEventConnection
    implements EventConnection,
    EventProducer,
    EventConsumer {
        private final String id;
        private final Api api;
        private final Disposable.Composite disposable = Disposables.composite();
        private final Sinks.Many<TopicPayload> consumer = Reactors.createMany((int)Integer.MAX_VALUE, (boolean)false);
        private final Sinks.Many<TopicPayload> producer = Reactors.createMany((int)Integer.MAX_VALUE, (boolean)false);
        private final Sinks.Many<Subscription> subscriptions = Reactors.createMany((int)Integer.MAX_VALUE, (boolean)false);
        private final Sinks.Many<Subscription> unSubscriptions = Reactors.createMany((int)Integer.MAX_VALUE, (boolean)false);
        private FluxSink<TopicPayload> sink;

        public RpcEventConnection(String id, Api api) {
            this.id = id;
            this.api = api;
            this.doOnDispose(Flux.create(sink -> {
                this.sink = sink;
            }).flatMap(payload -> {
                ByteBuf buf = this.encodePayload((TopicPayload)payload);
                if (null != buf) {
                    return this.api.pub(buf);
                }
                return Mono.empty();
            }).subscribe());
        }

        @Override
        public String getId() {
            return this.id;
        }

        @Override
        public boolean isAlive() {
            return true;
        }

        @Override
        public void doOnDispose(Disposable disposable) {
            this.disposable.add(disposable);
        }

        @Override
        public EventBroker getBroker() {
            return ClusterEventBusBroker.this;
        }

        public void dispose() {
            this.disposable.dispose();
            this.subscriptions.tryEmitComplete();
            this.unSubscriptions.tryEmitComplete();
            this.consumer.tryEmitComplete();
            this.sink.complete();
        }

        public boolean isDisposed() {
            return this.disposable.isDisposed();
        }

        @Override
        public Mono<Void> subscribe(Subscription subscription) {
            return this.api.sub(this.encodeSubscription(subscription));
        }

        @Override
        public Mono<Void> unsubscribe(Subscription subscription) {
            return this.api.unsub(this.encodeSubscription(subscription));
        }

        @Override
        public Flux<TopicPayload> subscribe() {
            return this.producer.asFlux();
        }

        @Override
        public Flux<Subscription> handleSubscribe() {
            return this.subscriptions.asFlux();
        }

        @Override
        public Flux<Subscription> handleUnSubscribe() {
            return this.unSubscriptions.asFlux();
        }

        @Override
        public FluxSink<TopicPayload> sink() {
            return this.sink;
        }

        private ByteBuf encodePayload(TopicPayload payload) {
            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
            try (ObjectOutput output = ClusterEventBusBroker.this.createOutput(buf);){
                output.writeUTF(ClusterEventBusBroker.this.rpcManager.currentServerId());
                output.writeUTF(payload.getTopic());
                SerializeUtils.writeKeyValue((Map)payload.getHeaders(), (ObjectOutput)output);
                Payload p = payload.getPayload();
                if (p instanceof NativePayload) {
                    SerializeUtils.writeObject((Object)((NativePayload)p).getNativeObject(), (ObjectOutput)output);
                } else {
                    output.write(p.getBytes());
                }
            }
            catch (Throwable e) {
                log.error(e.getMessage(), e);
            }
            return buf;
        }

        private ByteBuf encodeSubscription(Subscription payload) {
            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
            try (ObjectOutput output = ClusterEventBusBroker.this.createOutput(buf);){
                output.writeUTF(ClusterEventBusBroker.this.rpcManager.currentServerId());
                payload.writeExternal(output);
            }
            return buf;
        }

        public RpcEventConnection(String id, Api api, FluxSink<TopicPayload> sink) {
            this.id = id;
            this.api = api;
            this.sink = sink;
        }
    }
}

