/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.scalecube.rpc;

import com.fasterxml.jackson.core.JacksonException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.timeout.TimeoutException;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ThreadLocalRandom;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import io.scalecube.services.Reflect;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.ServiceInfo;
import io.scalecube.services.ServiceMethodDefinition;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.ServiceRegistration;
import io.scalecube.services.ServiceScanner;
import io.scalecube.services.api.Qualifier;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.MessageCodecException;
import io.scalecube.services.exceptions.ServiceClientErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.methods.MethodInfo;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
import io.scalecube.services.transport.api.ServiceTransport;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jctools.maps.NonBlockingHashMap;
import org.jctools.maps.NonBlockingHashSet;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.HashUtils;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.scalecube.rpc.DetailErrorMapper;
import org.jetlinks.supports.scalecube.rpc.NoneServiceRegistry;
import org.jetlinks.supports.scalecube.rpc.RpcServiceMethodRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.codec.CodecException;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public class ScalecubeRpcManager
implements RpcManager {
    private static final Logger log = LoggerFactory.getLogger(ScalecubeRpcManager.class);
    private final String id = UUID.randomUUID().toString();
    private static final String SPREAD_ENDPOINT_QUALIFIER = "rpc_edp";
    private static final String SPREAD_FROM_HEADER = "rpc_edp_f";
    private static final FastThreadLocal<List<RpcServiceCall<?>>> SHARED = new FastThreadLocal<List<RpcServiceCall<?>>>(){

        protected List<RpcServiceCall<?>> initialValue() {
            return new ArrayList(2);
        }
    };
    static final String DEFAULT_SERVICE_ID = "_default";
    static final String SERVICE_ID_TAG = "_sid";
    static final String SERVICE_NAME_TAG = "_sname";
    static final String REGISTER_TIME_TAG = "_regtime";
    private ExtendedCluster cluster;
    private ServiceCall serviceCall;
    private Scheduler requestScheduler = Schedulers.parallel();
    private final DetailErrorMapper errorMapper = new DetailErrorMapper();
    private static final RetryBackoffSpec DEFAULT_RETRY = Retry.backoff((long)12L, (Duration)Duration.ofMillis(50L)).filter(err -> !ScalecubeRpcManager.hasException(err, JacksonException.class, MessageCodecException.class, CodecException.class) && ScalecubeRpcManager.hasException(err, java.util.concurrent.TimeoutException.class, SocketException.class, SocketTimeoutException.class, TimeoutException.class, IOException.class)).doBeforeRetry(retrySignal -> {
        if (retrySignal.totalRetriesInARow() > 3L) {
            log.warn("rpc retries {} : [{}]", new Object[]{retrySignal.retryContextView().getOrEmpty(Method.class).map(m -> m.getDeclaringClass().getName() + "." + m.getName()).orElse("unknown"), retrySignal.totalRetriesInARow(), retrySignal.failure()});
        }
    });
    private Retry retry = DEFAULT_RETRY;
    private Supplier<ServiceTransport> transportSupplier;
    private final Map<String, ClusterNode> serverServiceRef = new NonBlockingHashMap();
    private final Map<String, Sinks.Many<ServiceEvent>> listener = new NonBlockingHashMap();
    private final List<ServiceRegistration> localRegistrations = new CopyOnWriteArrayList<ServiceRegistration>();
    private final RpcServiceMethodRegistry methodRegistry = new RpcServiceMethodRegistry();
    private ServiceTransport transport;
    private ServerTransport serverTransport;
    private String externalHost;
    private Integer externalPort;
    private String contentType = "application/json";
    private final Disposable.Swap syncJob = Disposables.swap();
    private final Disposable.Composite disposable = Disposables.composite();
    private final Map<Member, Disposable> syncMembers = new ConcurrentHashMap<Member, Disposable>();

    public ScalecubeRpcManager() {
        this(null, null);
    }

    public ScalecubeRpcManager(ExtendedCluster cluster, Supplier<ServiceTransport> transport) {
        this.cluster = cluster;
        this.transportSupplier = transport;
    }

    public ScalecubeRpcManager(ScalecubeRpcManager another) {
        this.cluster = another.cluster;
        this.transportSupplier = another.transportSupplier;
        this.externalHost = another.externalHost;
        this.externalPort = another.externalPort;
        this.contentType = another.contentType;
    }

    public String currentServerId() {
        String alias = this.cluster.member().alias();
        String id = this.cluster.member().id();
        return alias == null ? id : alias;
    }

    public ScalecubeRpcManager externalHost(String host) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.externalHost = host;
        return m;
    }

    public ScalecubeRpcManager externalPort(Integer port) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.externalPort = port;
        return m;
    }

    public ScalecubeRpcManager transport(Supplier<ServiceTransport> transportSupplier) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.transportSupplier = transportSupplier;
        return m;
    }

    public ScalecubeRpcManager cluster(ExtendedCluster cluster) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.cluster = cluster;
        return m;
    }

    public ScalecubeRpcManager contentType(String contentType) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.contentType = contentType;
        return m;
    }

    public void startAwait() {
        this.startAsync().block();
    }

    public Mono<Void> startAsync() {
        Objects.requireNonNull(this.transportSupplier);
        Objects.requireNonNull(this.cluster);
        this.cluster.handler(ignore -> new ClusterMessageHandler(){

            public void onMessage(Message message) {
                String from = message.header(ScalecubeRpcManager.SPREAD_FROM_HEADER);
                if (StringUtils.hasText((String)from) && ScalecubeRpcManager.SPREAD_ENDPOINT_QUALIFIER.equals(message.qualifier())) {
                    ScalecubeRpcManager.this.cluster.member(from).ifPresent(member -> ScalecubeRpcManager.this.handleServiceEndpoint(member, (ServiceEndpoint)message.data()));
                }
            }

            public void onGossip(Message gossip) {
                this.onMessage(gossip);
            }

            public void onMembershipEvent(MembershipEvent event) {
                Disposable _old;
                if (event.isLeaving() || event.isRemoved()) {
                    ScalecubeRpcManager.this.memberLeave(event.member());
                    if (event.isLeaving()) {
                        Schedulers.parallel().schedule(() -> {
                            if (ScalecubeRpcManager.this.cluster.member(event.member().id()).isPresent()) {
                                ScalecubeRpcManager.this.syncRegistration(event.member());
                            }
                        }, 10L, TimeUnit.SECONDS);
                    } else {
                        _old = (Disposable)ScalecubeRpcManager.this.syncMembers.remove(event.member());
                        if (_old != null) {
                            _old.dispose();
                        }
                    }
                }
                if (event.isAdded() || event.isUpdated()) {
                    _old = (Disposable)ScalecubeRpcManager.this.syncMembers.remove(event.member());
                    if (_old != null) {
                        _old.dispose();
                    }
                    ScalecubeRpcManager.this.syncRegistration(event.member());
                }
            }
        });
        return this.initTransport(this.transportSupplier.get()).start().doOnNext(trans -> {
            this.transport = trans;
        }).flatMap(trans -> trans.serverTransport((ServiceMethodRegistry)this.methodRegistry).bind()).doOnNext(trans -> {
            this.serverTransport = trans;
        }).then(Mono.fromRunnable(this::start0));
    }

    private ServiceTransport initTransport(ServiceTransport transport) {
        return transport;
    }

    public Flux<RpcService<?>> getServices() {
        return Flux.fromIterable(this.serverServiceRef.values()).flatMapIterable(node -> ((ClusterNode)node).serviceInstances.values()).flatMapIterable(rec$ -> ((ServiceInstances)rec$).getAllCalls());
    }

    public boolean isShutdown() {
        return this.disposable.isDisposed() || this.cluster != null && this.cluster.isShutdown();
    }

    private void start0() {
        StackTraceElement trace = new StackTraceElement("RpcCallFailed", this.currentServerId(), null, 1);
        this.errorMapper.setTopTrace(trace);
        this.serviceCall = new ServiceCall().transport(this.transport.clientTransport());
        this.syncRegistration();
        this.disposable.add(Flux.interval((Duration)Duration.ofSeconds(60L)).onBackpressureDrop().concatMap(ignore -> this.doSyncRegistration().onErrorResume(err -> Mono.empty())).subscribe());
    }

    public void stopAwait() {
        this.stopAsync().block();
    }

    public Mono<Void> stopAsync() {
        if (this.serverTransport == null || this.transport == null) {
            return Mono.empty();
        }
        this.syncJob.dispose();
        this.disposable.dispose();
        this.serverServiceRef.clear();
        this.localRegistrations.clear();
        return Flux.concatDelayError((Publisher[])new Publisher[]{this.doSyncRegistration(), this.serverTransport.stop(), this.transport.stop()}).doOnComplete(() -> {
            this.disposable.dispose();
            this.serverTransport = null;
            this.transport = null;
        }).then();
    }

    private Address resolveAddress() {
        if (StringUtils.hasText((String)this.externalHost)) {
            if (this.externalPort != null) {
                return Address.create((String)this.externalHost, (int)this.externalPort);
            }
            return Address.create((String)this.externalHost, (int)this.serverTransport.address().port());
        }
        return ScalecubeRpcManager.prepareAddress(this.serverTransport.address());
    }

    private static Address prepareAddress(Address address) {
        InetAddress inetAddress;
        try {
            inetAddress = InetAddress.getByName(address.host());
        }
        catch (UnknownHostException e) {
            throw Exceptions.propagate((Throwable)e);
        }
        if (inetAddress.isAnyLocalAddress()) {
            return Address.create((String)Address.getLocalIpAddress().getHostAddress(), (int)address.port());
        }
        return Address.create((String)inetAddress.getHostAddress(), (int)address.port());
    }

    private ServiceEndpoint createEndpoint() {
        return ServiceEndpoint.builder().id(this.id).address(this.resolveAddress()).contentTypes(DataCodec.getAllContentTypes()).serviceRegistrations(this.localRegistrations).build();
    }

    private synchronized void syncRegistration(Member member) {
        if (this.syncMembers.containsKey(member) || this.cluster.isShutdown()) {
            return;
        }
        Disposable.Swap _dispose = Disposables.swap();
        _dispose.update(this.cluster.send(member, Message.withData((Object)this.createEndpoint()).header(SPREAD_FROM_HEADER, this.cluster.member().id()).qualifier(SPREAD_ENDPOINT_QUALIFIER).build()).retryWhen((Retry)Retry.fixedDelay((long)30L, (Duration)Duration.ofSeconds(1L)).filter(err -> err.getMessage() == null || err.getMessage().contains("Connection refused") || this.cluster.member(member.id()).isPresent())).doFinally(ignore -> this.syncMembers.remove(member, _dispose)).subscribe(ignore -> {}, error -> {
            if (this.cluster.member(member.id()).isPresent()) {
                log.error("Synchronization registration [{}] error", (Object)member, error);
            }
        }));
        this.syncMembers.put(member, (Disposable)_dispose);
    }

    private Mono<Void> doSyncRegistration() {
        if (this.cluster.isShutdown()) {
            return Mono.empty();
        }
        ServiceEndpoint endpoint = this.createEndpoint();
        log.debug("Synchronization registration : {}", (Object)endpoint);
        return this.cluster.spreadGossip(Message.withData((Object)endpoint).header(SPREAD_FROM_HEADER, this.cluster.member().id()).qualifier(SPREAD_ENDPOINT_QUALIFIER).build()).doOnError(err -> log.error("Synchronization registration error", err)).then();
    }

    private synchronized void syncRegistration() {
        if (this.cluster == null) {
            return;
        }
        this.syncJob.update(Mono.delay((Duration)Duration.ofMillis(200L)).flatMap(ignore -> this.doSyncRegistration()).subscribe());
    }

    public <T> Disposable registerService(String service, T rpcService) {
        Disposable.Composite _dispose = Disposables.composite();
        ServiceInfo serviceInfo = ServiceInfo.fromServiceInstance(rpcService).errorMapper((ServiceProviderErrorMapper)this.errorMapper).dataDecoder((msg, type) -> {
            if (type.isAssignableFrom(ByteBuf.class) && msg.hasData(ByteBuf.class)) {
                return ServiceMessage.from((ServiceMessage)msg).data(msg.data()).build();
            }
            return (ServiceMessage)ServiceMessageDataDecoder.INSTANCE.apply(msg, type);
        }).tag(SERVICE_ID_TAG, service).build();
        _dispose.add(this.methodRegistry.registerService0(serviceInfo));
        List registrations = ServiceScanner.scanServiceInfo((ServiceInfo)serviceInfo).stream().map(ref -> {
            HashMap<String, String> tags = new HashMap<String, String>(ref.tags());
            tags.put(SERVICE_ID_TAG, service);
            tags.put(SERVICE_NAME_TAG, ref.namespace());
            tags.put(REGISTER_TIME_TAG, String.valueOf(System.currentTimeMillis()));
            return new ServiceRegistration(ScalecubeRpcManager.createMethodQualifier(service, ref.namespace()), tags, ref.methods());
        }).collect(Collectors.toList());
        this.localRegistrations.addAll(registrations);
        this.syncRegistration();
        log.debug("register rpc service {}", (Object)serviceInfo);
        _dispose.add(() -> {
            this.localRegistrations.removeAll(registrations);
            this.syncRegistration();
        });
        return _dispose;
    }

    public <T> Disposable registerService(T rpcService) {
        return this.registerService(DEFAULT_SERVICE_ID, rpcService);
    }

    public <I> Flux<RpcService<I>> getServices(Class<I> service) {
        return Flux.defer(() -> Flux.fromIterable(this.serverServiceRef.entrySet()).flatMapIterable(e -> ((ClusterNode)e.getValue()).getApiCalls(service)));
    }

    public <I> Mono<RpcService<I>> selectService(Class<I> service) {
        return this.selectService(service, null);
    }

    public <I> Mono<RpcService<I>> selectService(Class<I> service, Object routeKey) {
        return Mono.defer(() -> this.selectService0(service, routeKey));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <I> Mono<RpcService<I>> selectService0(Class<I> service, Object routeKey) {
        List calls = (List)SHARED.get();
        try {
            Mono mono;
            for (Map.Entry<String, ClusterNode> entry : this.serverServiceRef.entrySet()) {
                entry.getValue().getApiCalls(null, service, calls);
            }
            int size = calls.size();
            if (size == 0) {
                mono = Mono.empty();
                return mono;
            }
            if (size == 1) {
                mono = Mono.just(calls.get(0));
                return mono;
            }
            if (routeKey == null) {
                mono = Mono.just(calls.get(ThreadLocalRandom.current().nextInt(size)));
                return mono;
            }
            calls.sort(Comparator.comparingLong(call -> ScalecubeRpcManager.hash(((RpcServiceCall)call).serverNodeId, routeKey)));
            mono = Mono.just(calls.get(0));
            return mono;
        }
        finally {
            calls.clear();
        }
    }

    public <I> Flux<RpcService<I>> getServices(String id, Class<I> service) {
        return Flux.defer(() -> Flux.fromIterable(this.serverServiceRef.entrySet()).flatMapIterable(e -> ((ClusterNode)e.getValue()).getApiCalls(id, service)));
    }

    public <I> Mono<I> getService(String serverNodeId, Class<I> service) {
        return this.getService(serverNodeId, DEFAULT_SERVICE_ID, service);
    }

    public <I> Mono<I> getService(String serverNodeId, String serviceId, Class<I> service) {
        return Mono.fromSupplier(() -> {
            ClusterNode node = this.serverServiceRef.get(serverNodeId);
            if (node == null) {
                return null;
            }
            RpcServiceCall call = node.getApiCall(serviceId, service);
            return node.isSupported(call) ? call.service() : null;
        });
    }

    public <I> Flux<ServiceEvent> listen(Class<I> service) {
        String name = Reflect.serviceName(service);
        return this.listener.computeIfAbsent(name, ignore -> Sinks.many().multicast().onBackpressureBuffer()).asFlux();
    }

    private void memberLeave(Member member) {
        String id = member.alias() == null ? member.id() : member.alias();
        ClusterNode ref = this.serverServiceRef.remove(id);
        if (null != ref) {
            this.fireEvent(ref.services.values(), id, ServiceEvent.Type.removed);
            ref.dispose();
        }
        log.debug("remove service endpoint [{}] ", (Object)member);
    }

    private void fireEvent(Collection<ServiceRegistration> services, String memberId, ServiceEvent.Type type) {
        for (ServiceRegistration service : services) {
            String serviceName = service.tags().getOrDefault(SERVICE_NAME_TAG, service.namespace());
            Sinks.Many<ServiceEvent> sink = this.listener.get(serviceName);
            if (sink == null || sink.currentSubscriberCount() <= 0) continue;
            String id = service.tags().getOrDefault(SERVICE_ID_TAG, DEFAULT_SERVICE_ID);
            sink.emitNext((Object)new ServiceEvent(id, serviceName, memberId, type), Reactors.emitFailureHandler());
        }
    }

    private void handleServiceEndpoint(Member member, ServiceEndpoint endpoint) {
        if (this.cluster.member().id().equals(member.id())) {
            return;
        }
        String id = member.alias() == null ? member.id() : member.alias();
        ClusterNode references = this.serverServiceRef.computeIfAbsent(id, ignore -> new ClusterNode());
        references.id = id;
        references.member = member;
        references.rpcAddress = endpoint.address();
        references.register(endpoint);
    }

    static String createMethodQualifier(String serviceId, String qualifier) {
        return Qualifier.asString((String)serviceId, (String)qualifier);
    }

    @SafeVarargs
    private static boolean hasException(Throwable e, Class<? extends Throwable> ... target) {
        for (Throwable cause = e; cause != null; cause = cause.getCause()) {
            for (Class<? extends Throwable> aClass : target) {
                if (aClass.isInstance(cause)) {
                    return true;
                }
                for (Throwable throwable : cause.getSuppressed()) {
                    boolean hasError;
                    if (throwable == e || !(hasError = ScalecubeRpcManager.hasException(throwable, target))) continue;
                    return true;
                }
            }
            if (cause == cause.getCause()) break;
        }
        return false;
    }

    private static long hash(String server, Object key) {
        return HashUtils.murmur3_128((Object)server, (Object[])new Object[]{key});
    }

    public void setRetry(Retry retry) {
        this.retry = retry;
    }

    public Retry getRetry() {
        return this.retry;
    }

    static class ServiceReferenceInfo {
        private String id;
        private ServiceReference reference;

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ServiceReferenceInfo)) {
                return false;
            }
            ServiceReferenceInfo other = (ServiceReferenceInfo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$id = this.id;
            String other$id = other.id;
            return !(this$id == null ? other$id != null : !this$id.equals(other$id));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ServiceReferenceInfo;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $id = this.id;
            result = result * 59 + ($id == null ? 43 : $id.hashCode());
            return result;
        }

        public ServiceReferenceInfo(String id, ServiceReference reference) {
            this.id = id;
            this.reference = reference;
        }
    }

    static class RpcServiceCall<T>
    implements RpcService<T> {
        private final String serverNodeId;
        private final String id;
        private final String name;
        private final T service;
        private final String namespace;

        public String serverNodeId() {
            return this.serverNodeId;
        }

        public String id() {
            return this.id;
        }

        public String name() {
            return this.name;
        }

        public T service() {
            return this.service;
        }

        public <R> R cast(Class<R> type) {
            return type.cast(this.service);
        }

        public String toString() {
            return this.name + "@" + this.serverNodeId;
        }

        public RpcServiceCall(String serverNodeId, String id, String name, T service, String namespace) {
            this.serverNodeId = serverNodeId;
            this.id = id;
            this.name = name;
            this.service = service;
            this.namespace = namespace;
        }
    }

    static class ServiceInstances {
        private final Map<String, RpcServiceCall<?>> calls = new NonBlockingHashMap();
        private final String name;
        private final Class<?> type;

        ServiceInstances(Class<?> type) {
            this.type = type;
            this.name = Reflect.serviceName(type);
        }

        public RpcServiceCall<?> computeIfAbsent(String id, BiFunction<String, Class<?>, RpcServiceCall<?>> supplier) {
            RpcServiceCall<?> fastPath = this.calls.get(id);
            if (fastPath != null) {
                return fastPath;
            }
            return this.calls.computeIfAbsent(id, (? super K _id) -> (RpcServiceCall)supplier.apply((String)_id, this.type));
        }

        public RpcServiceCall<?> get(String callId) {
            return this.calls.get(callId);
        }

        private Collection<RpcServiceCall<?>> getAllCalls() {
            return this.calls.values();
        }

        public String getName() {
            return this.name;
        }
    }

    class ClusterNode
    implements Disposable {
        private String id;
        private Member member;
        private Address rpcAddress;
        private final Map<String, Set<ServiceReferenceInfo>> serviceReferencesByQualifier = new NonBlockingHashMap();
        private final Map<String, ServiceRegistration> services = new NonBlockingHashMap();
        private final Map<Class<?>, ServiceInstances> serviceInstances = new NonBlockingHashMap();

        ClusterNode() {
        }

        public boolean isSupported(RpcServiceCall<?> call) {
            return this.services.containsKey(((RpcServiceCall)call).namespace);
        }

        public synchronized void register(ServiceEndpoint endpoint) {
            ArrayList<String> readyToRemove = new ArrayList<String>(this.serviceReferencesByQualifier.keySet());
            HashMap<String, ServiceRegistration> added = new HashMap<String, ServiceRegistration>();
            HashMap<String, ServiceRegistration> removed = new HashMap<String, ServiceRegistration>(this.services);
            log.debug("update service endpoint from [{}] : {} ", (Object)this.member, (Object)endpoint);
            for (ServiceRegistration registration : endpoint.serviceRegistrations()) {
                if (removed.remove(registration.namespace()) == null) {
                    added.put(registration.namespace(), registration);
                }
                for (ServiceMethodDefinition method : registration.methods()) {
                    ServiceReference ref = new ServiceReference(method, registration, endpoint);
                    readyToRemove.remove(ref.qualifier());
                    readyToRemove.remove(ref.oldQualifier());
                    this.populateServiceReferences(ref.qualifier(), ref);
                    this.populateServiceReferences(ref.oldQualifier(), ref);
                }
            }
            for (String qualifier : readyToRemove) {
                this.serviceReferencesByQualifier.remove(qualifier);
            }
            removed.forEach((k, v) -> this.services.remove(k));
            this.services.putAll(added);
            ScalecubeRpcManager.this.fireEvent(added.values(), this.id, ServiceEvent.Type.added);
            ScalecubeRpcManager.this.fireEvent(removed.values(), this.id, ServiceEvent.Type.removed);
        }

        private boolean populateServiceReferences(String qualifier, ServiceReference serviceReference) {
            String id = serviceReference.tags().getOrDefault(ScalecubeRpcManager.SERVICE_ID_TAG, ScalecubeRpcManager.DEFAULT_SERVICE_ID);
            return this.serviceReferencesByQualifier.computeIfAbsent(qualifier, key -> new NonBlockingHashSet()).add(new ServiceReferenceInfo(id, serviceReference));
        }

        private <I> RpcServiceCall<I> createApiCall(String serviceId, Class<I> clazz) {
            String name = Reflect.serviceName(clazz);
            ServiceCall call = ScalecubeRpcManager.this.serviceCall.router((serviceRegistry, request) -> {
                Set<ServiceReferenceInfo> refs = this.serviceReferencesByQualifier.get(request.qualifier());
                if (refs == null) {
                    return Optional.empty();
                }
                Iterator<ServiceReferenceInfo> iterator = refs.iterator();
                if (iterator.hasNext()) {
                    ServiceReferenceInfo ref = iterator.next();
                    return Optional.of(ref.reference);
                }
                return Optional.empty();
            }).serviceRegistry(NoneServiceRegistry.INSTANCE).errorMapper((ServiceClientErrorMapper)ScalecubeRpcManager.this.errorMapper);
            return new RpcServiceCall<I>(this.id, serviceId, name, this.api(call, serviceId, clazz), serviceId + "/" + name);
        }

        private <I> List<RpcServiceCall<I>> getApiCalls(Class<I> clazz) {
            return this.getApiCalls(null, clazz);
        }

        private String getServiceName(Class<?> clazz) {
            ServiceInstances instances = this.serviceInstances.get(clazz);
            if (instances != null) {
                return instances.name;
            }
            return Reflect.serviceName(clazz);
        }

        private <I> List<RpcServiceCall<I>> getApiCalls(String id, Class<I> clazz, List<RpcServiceCall<I>> registrations) {
            String sName = this.getServiceName(clazz);
            for (ServiceRegistration service : this.services.values()) {
                String name = service.tags().getOrDefault(ScalecubeRpcManager.SERVICE_NAME_TAG, service.namespace());
                String sid = service.tags().getOrDefault(ScalecubeRpcManager.SERVICE_ID_TAG, ScalecubeRpcManager.DEFAULT_SERVICE_ID);
                if (!Objects.equals(name, sName) || id != null && !Objects.equals(sid, id)) continue;
                registrations.add(this.getApiCall(sid, clazz, service));
            }
            return registrations;
        }

        private <I> List<RpcServiceCall<I>> getApiCalls(String id, Class<I> clazz) {
            return this.getApiCalls(id, clazz, new ArrayList<RpcServiceCall<I>>(2));
        }

        private <I> RpcServiceCall<I> getApiCall(String id, Class<I> clazz, ServiceRegistration registration) {
            return this.serviceInstances.computeIfAbsent(clazz, ServiceInstances::new).computeIfAbsent(id, this::createApiCall);
        }

        private <I> RpcServiceCall<I> getApiCall(String id, Class<I> clazz) {
            return this.getApiCall(id, clazz, null);
        }

        private ServiceMessage.Builder toServiceMessageBuilder(MethodInfo methodInfo, Object request) {
            ServiceMessage.Builder builder = request instanceof ServiceMessage ? ServiceMessage.from((ServiceMessage)((ServiceMessage)request)).qualifier(methodInfo.qualifier()) : ServiceMessage.builder().qualifier(methodInfo.qualifier()).data(request).dataFormatIfAbsent(ScalecubeRpcManager.this.contentType);
            return builder;
        }

        private Mono<ServiceMessage> toServiceMessage(MethodInfo methodInfo, Object request) {
            ServiceMessage.Builder builder = this.toServiceMessageBuilder(methodInfo, request);
            return TraceHolder.writeContextTo((Object)builder, ServiceMessage.Builder::header).map(ServiceMessage.Builder::build);
        }

        private Retry getRetry(Method method) {
            if (ScalecubeRpcManager.this.retry instanceof RetryBackoffSpec) {
                return ((RetryBackoffSpec)ScalecubeRpcManager.this.retry).withRetryContext((ContextView)Context.of(Method.class, (Object)method));
            }
            return ScalecubeRpcManager.this.retry;
        }

        private <T> T api(final ServiceCall serviceCall, String id, final Class<T> serviceInterface) {
            final HashMap genericReturnTypes = new HashMap(Reflect.methodsInfo(serviceInterface));
            for (Map.Entry entry : genericReturnTypes.entrySet()) {
                MethodInfo old = (MethodInfo)entry.getValue();
                entry.setValue(new MethodInfo(Qualifier.asString((String)id, (String)old.serviceName()), old.methodName(), old.parameterizedReturnType(), old.isReturnTypeServiceMessage(), old.communicationMode(), old.parameterCount(), old.requestType(), old.isRequestTypeServiceMessage(), old.isSecured()));
            }
            return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler(){

                @Override
                public Object invoke(Object proxy, Method method, Object[] params) {
                    Optional check = ClusterNode.this.toStringOrEqualsOrHashCode(method.getName(), serviceInterface, params);
                    if (check.isPresent()) {
                        return check.get();
                    }
                    MethodInfo methodInfo = (MethodInfo)genericReturnTypes.get(method);
                    Type returnType = methodInfo.parameterizedReturnType();
                    boolean isServiceMessage = methodInfo.isReturnTypeServiceMessage();
                    Object request = methodInfo.requestType() == Void.TYPE ? null : params[0];
                    switch (methodInfo.communicationMode()) {
                        case FIRE_AND_FORGET: {
                            return ClusterNode.this.toServiceMessage(methodInfo, request).flatMap(arg_0 -> ((ServiceCall)serviceCall).oneWay(arg_0)).subscribeOn(ScalecubeRpcManager.this.requestScheduler).retryWhen(ClusterNode.this.getRetry(method));
                        }
                        case REQUEST_RESPONSE: {
                            return ClusterNode.this.toServiceMessage(methodInfo, request).flatMap(msg -> serviceCall.requestOne(msg, returnType)).subscribeOn(ScalecubeRpcManager.this.requestScheduler).transform(ClusterNode.this.asMono(isServiceMessage)).retryWhen(ClusterNode.this.getRetry(method));
                        }
                        case REQUEST_STREAM: {
                            return ClusterNode.this.toServiceMessage(methodInfo, request).flatMapMany(msg -> serviceCall.requestMany(msg, returnType)).subscribeOn(ScalecubeRpcManager.this.requestScheduler).transform(ClusterNode.this.asFlux(isServiceMessage)).retryWhen(ClusterNode.this.getRetry(method));
                        }
                        case REQUEST_CHANNEL: {
                            return serviceCall.requestBidirectional((Publisher)Flux.deferContextual(ctx -> Flux.from((Publisher)((Publisher)request)).index((o, data) -> {
                                if (o == 0L) {
                                    return ((ServiceMessage.Builder)TraceHolder.writeContextTo((ContextView)ctx, (Object)ClusterNode.this.toServiceMessageBuilder(methodInfo, data), ServiceMessage.Builder::header)).build();
                                }
                                return ClusterNode.this.toServiceMessageBuilder(methodInfo, data).build();
                            })), returnType).subscribeOn(ScalecubeRpcManager.this.requestScheduler).transform(ClusterNode.this.asFlux(isServiceMessage)).retryWhen(ClusterNode.this.getRetry(method));
                        }
                    }
                    throw new IllegalArgumentException("Communication mode is not supported: " + method);
                }
            });
        }

        private Function<Flux<ServiceMessage>, Flux<Object>> asFlux(boolean isReturnTypeServiceMessage) {
            if (isReturnTypeServiceMessage) {
                return flux -> flux.cast(Object.class);
            }
            return flux -> flux.map(ServiceMessage::data);
        }

        private Function<Mono<ServiceMessage>, Mono<Object>> asMono(boolean isReturnTypeServiceMessage) {
            if (isReturnTypeServiceMessage) {
                return mono -> mono.cast(Object.class);
            }
            return mono -> mono.map(ServiceMessage::data);
        }

        private Optional<Object> toStringOrEqualsOrHashCode(String method, Class<?> serviceInterface, Object ... args) {
            switch (method) {
                case "toString": {
                    return Optional.of(serviceInterface.toString());
                }
                case "equals": {
                    return Optional.of(serviceInterface.equals(args[0]));
                }
                case "hashCode": {
                    return Optional.of(serviceInterface.hashCode());
                }
            }
            return Optional.empty();
        }

        public void dispose() {
        }
    }
}

