/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.scheduler.ClusterRemoteScheduler;
import org.jetlinks.rule.engine.cluster.scheduler.SchedulerRpcService;
import org.jetlinks.rule.engine.cluster.scheduler.SchedulerRpcServiceImpl;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ClusterRpcSchedulerRegistry
implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterRpcSchedulerRegistry.class);
    private final RpcManager rpcManager;
    private final Map<String, Scheduler> locals = new NonBlockingHashMap();
    private final Map<String, Scheduler> remotes = new NonBlockingHashMap();
    private final Sinks.Many<Scheduler> joinListener = Reactors.createMany();
    private final Sinks.Many<Scheduler> leaveListener = Reactors.createMany();

    public ClusterRpcSchedulerRegistry(RpcManager rpcManager) {
        this.rpcManager = rpcManager;
        this.init();
    }

    void init() {
        this.rpcManager.getServices(SchedulerRpcService.class).subscribe(rpc -> this.remotes.put(rpc.id(), new ClusterRemoteScheduler(rpc.id(), (SchedulerRpcService)rpc.service())));
        this.rpcManager.listen(SchedulerRpcService.class).flatMap(event -> this.handleEvent((ServiceEvent)event).onErrorResume(err -> Mono.empty())).subscribe(e -> {});
    }

    private Mono<Void> handleEvent(ServiceEvent event) {
        Scheduler scheduler2;
        if (event.getType() == ServiceEvent.Type.added) {
            return this.rpcManager.getService(event.getServerNodeId(), event.getServiceId(), SchedulerRpcService.class).map(rpcService -> new ClusterRemoteScheduler(event.getServiceId(), (SchedulerRpcService)rpcService)).doOnNext(scheduler -> {
                if (this.remotes.put(event.getServiceId(), (Scheduler)scheduler) == null && this.joinListener.currentSubscriberCount() > 0) {
                    this.joinListener.emitNext(scheduler, Reactors.emitFailureHandler());
                }
            }).then();
        }
        if (event.getType() == ServiceEvent.Type.removed && null != (scheduler2 = this.remotes.remove(event.getServiceId())) && this.leaveListener.currentSubscriberCount() > 0) {
            this.leaveListener.emitNext((Object)scheduler2, Reactors.emitFailureHandler());
        }
        return Mono.empty();
    }

    @Override
    public List<Scheduler> getLocalSchedulers() {
        return new ArrayList<Scheduler>(this.locals.values());
    }

    @Override
    public Flux<Scheduler> getSchedulers() {
        return Flux.concat((Publisher[])new Publisher[]{Flux.fromIterable(this.getLocalSchedulers()), Flux.fromIterable(this.remotes.values())});
    }

    @Override
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.joinListener.asFlux();
    }

    @Override
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.leaveListener.asFlux();
    }

    @Override
    public void register(Scheduler scheduler) {
        if (this.locals.containsKey(scheduler.getId())) {
            throw new IllegalStateException("scheduler " + scheduler.getId() + " already registered");
        }
        this.locals.put(scheduler.getId(), scheduler);
        this.rpcManager.registerService(scheduler.getId(), (Object)new SchedulerRpcServiceImpl(scheduler));
    }
}

