/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.rpc;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.core.rpc.ServiceEvent;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class LocalRpcManager
implements RpcManager {
    private final Map<String, RpcServiceInfo<?>> services = new ConcurrentHashMap();

    @Override
    public String currentServerId() {
        return "localhost";
    }

    @Override
    public <T> Disposable registerService(T rpcService) {
        return this.registerService(rpcService.getClass().getName(), rpcService);
    }

    @Override
    public <T> Disposable registerService(String serviceId, T rpcService) {
        RpcServiceInfo serviceInfo = new RpcServiceInfo(serviceId, rpcService.getClass().getSimpleName(), this.currentServerId(), rpcService);
        this.services.put(serviceId, serviceInfo);
        return () -> this.services.remove(serviceId, serviceInfo);
    }

    @Override
    public <I> Flux<RpcService<I>> getServices(Class<I> service) {
        return Flux.fromIterable(this.services.values()).mapNotNull(e -> {
            if (service.isInstance(((RpcServiceInfo)e).service)) {
                return e;
            }
            return null;
        });
    }

    @Override
    public <I> Mono<RpcService<I>> selectService(Class<I> service) {
        return Flux.fromIterable(this.services.values()).mapNotNull(e -> {
            if (service.isInstance(((RpcServiceInfo)e).service)) {
                return e;
            }
            return null;
        }).take(1L).singleOrEmpty();
    }

    @Override
    public <I> Flux<RpcService<I>> getServices(String serviceId, Class<I> service) {
        return Flux.fromIterable(this.services.values()).mapNotNull(e -> {
            if (Objects.equals(((RpcServiceInfo)e).id, serviceId) && service.isInstance(((RpcServiceInfo)e).service)) {
                return e;
            }
            return null;
        });
    }

    @Override
    public <I> Mono<I> getService(String serverNodeId, Class<I> service) {
        return this.selectService(service).map(RpcService::service);
    }

    @Override
    public <I> Mono<I> getService(String serverNodeId, String serviceId, Class<I> service) {
        return this.getServices(serviceId, service).take(1L).singleOrEmpty().map(RpcService::service);
    }

    @Override
    public <I> Flux<ServiceEvent> listen(Class<I> service) {
        return Flux.empty();
    }

    @Override
    public Flux<RpcService<?>> getServices() {
        return Flux.fromIterable(this.services.values());
    }

    static class RpcServiceInfo<T>
    implements RpcService<T> {
        private final String id;
        private final String name;
        private final String serverNodeId;
        private final T service;

        @Override
        public String serverNodeId() {
            return this.serverNodeId;
        }

        @Override
        public String id() {
            return this.id;
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public T service() {
            return this.service;
        }

        public RpcServiceInfo(String id, String name, String serverNodeId, T service) {
            this.id = id;
            this.name = name;
            this.serverNodeId = serverNodeId;
            this.service = service;
        }
    }
}

