/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.protocol.management;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.supports.protocol.StaticProtocolSupports;
import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultProtocolSupportManager
extends StaticProtocolSupports
implements ProtocolSupportManager {
    private static final Logger log = LoggerFactory.getLogger(DefaultProtocolSupportManager.class);
    public static final String topic = "/_sys/protocol/changed";
    private final EventBus eventBus;
    private final ClusterCache<String, ProtocolSupportDefinition> cache;
    private final ProtocolSupportLoader loader;
    private final Map<String, String> configProtocolIdMapping = new ConcurrentHashMap<String, String>();
    private final Disposable.Composite disposable = Disposables.composite();
    private Duration loadTimeout = Duration.ofSeconds(30L);

    public void init() {
        this.disposable.add(this.eventBus.subscribe(Subscription.builder().topics(new String[]{topic}).subscriberId("protocol-manager").local().broker().build(), payload -> this.init((ProtocolSupportDefinition)payload.decode(ProtocolSupportDefinition.class))));
        try {
            this.loadAll().filter(de -> de.getState() == 1).flatMap(def -> this.init((ProtocolSupportDefinition)def).onErrorResume(err -> Mono.empty())).blockLast(this.loadTimeout);
        }
        catch (Throwable error) {
            log.warn("load protocol error", error);
        }
    }

    public void shutdown() {
        this.disposable.dispose();
    }

    @Override
    public Mono<Boolean> store(Flux<ProtocolSupportDefinition> all) {
        return all.collect(Collectors.toMap(ProtocolSupportDefinition::getId, Function.identity())).flatMap(arg_0 -> this.cache.putAll(arg_0));
    }

    @Override
    public Flux<ProtocolSupportDefinition> loadAll() {
        return this.cache.values();
    }

    @Override
    public Mono<Boolean> save(ProtocolSupportDefinition definition) {
        return this.cache.put((Object)definition.getId(), (Object)definition).flatMap(su -> this.eventBus.publish(topic, (Object)definition).thenReturn(su));
    }

    @Override
    public Mono<Boolean> remove(String id) {
        return this.cache.get((Object)id).doOnNext(def -> def.setState((byte)-1)).flatMap(e -> this.eventBus.publish(topic, e)).then(this.cache.remove((Object)id));
    }

    public Mono<Void> init(ProtocolSupportDefinition definition) {
        return (Mono)Mono.defer(() -> {
            String operation = definition.getState() != 1 ? "uninstall" : "install";
            try {
                String protocol;
                if (definition.getState() != 1 && (protocol = this.configProtocolIdMapping.get(definition.getId())) != null) {
                    log.debug("uninstall protocol:{}", (Object)definition);
                    this.unRegister(protocol);
                    return Mono.empty();
                }
                Consumer<ProtocolSupport> consumer = definition.getState() != 1 ? this::unRegister : this::register;
                log.debug("{} protocol:{}", (Object)operation, (Object)definition);
                return this.loader.load(definition).doOnNext(e -> {
                    e.init(definition.getConfiguration());
                    log.debug("{} protocol[{}] success: {}", new Object[]{operation, definition.getId(), e});
                    this.configProtocolIdMapping.put(definition.getId(), e.getId());
                    consumer.accept(this.afterLoaded(definition, (ProtocolSupport)e));
                }).onErrorResume(e -> {
                    log.error("{} protocol[{}] error", new Object[]{operation, definition.getId(), e});
                    this.loadError(definition, (Throwable)e);
                    return Mono.empty();
                }).then();
            }
            catch (Throwable err) {
                log.error("{} protocol error", (Object)operation, (Object)err);
                this.loadError(definition, err);
                return Mono.empty();
            }
        }).as((Function)MonoTracer.create((String)("/protocol/" + definition.getId() + "/init")));
    }

    protected void loadError(ProtocolSupportDefinition def, Throwable err) {
    }

    protected ProtocolSupport afterLoaded(ProtocolSupportDefinition def, ProtocolSupport support) {
        return support;
    }

    public DefaultProtocolSupportManager(EventBus eventBus, ClusterCache<String, ProtocolSupportDefinition> cache, ProtocolSupportLoader loader) {
        this.eventBus = eventBus;
        this.cache = cache;
        this.loader = loader;
    }

    public void setLoadTimeout(Duration loadTimeout) {
        this.loadTimeout = loadTimeout;
    }
}

