/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.config;

import com.google.common.collect.Collections2;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.lang.SharedPathString;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.core.utils.CompositeMap;
import org.jetlinks.supports.config.CacheNotify;
import org.jetlinks.supports.config.LocalCacheClusterConfigStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class EventBusStorageManager
implements ConfigStorageManager,
Disposable {
    private static final Logger log = LoggerFactory.getLogger(EventBusStorageManager.class);
    static final String NOTIFY_TOPIC = "/_sys/cluster_cache";
    private final AtomicBoolean CLUSTER_SUBSCRIBER = new AtomicBoolean();
    final ConcurrentMap<String, LocalCacheClusterConfigStorage> cache;
    private final Disposable.Composite disposable = Disposables.composite();
    private final EventBus eventBus;
    private final Function<String, LocalCacheClusterConfigStorage> storageBuilder;
    private Function<String, StorageFilter> filterFactory;

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
        this(clusterManager, eventBus, -1L);
    }

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus, long ttl) {
        this.cache = new ConcurrentHashMap<String, LocalCacheClusterConfigStorage>();
        this.eventBus = eventBus;
        Consumer<LocalCacheClusterConfigStorage> removeHandler = store -> this.cache.remove(store.id, store);
        this.storageBuilder = id -> new LocalCacheClusterConfigStorage((String)id, eventBus, (ClusterCache<String, Object>)clusterManager.createCache(id), ttl, removeHandler);
        if (ttl > 0L) {
            Scheduler scheduler = Schedulers.newSingle((String)"configs-storage-cleaner");
            this.disposable.add((Disposable)scheduler);
            this.disposable.add(scheduler.schedulePeriodically(this::cleanup, 120L, 60L, TimeUnit.SECONDS));
        }
    }

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus, Supplier<ConcurrentMap<String, Object>> cacheSupplier) {
        this.eventBus = eventBus;
        this.cache = cacheSupplier.get();
        Consumer<LocalCacheClusterConfigStorage> removeHandler = store -> this.cache.remove(store.id, store);
        this.storageBuilder = id -> new LocalCacheClusterConfigStorage((String)id, eventBus, (ClusterCache<String, Object>)clusterManager.createCache(id), -1L, removeHandler, (Map)cacheSupplier.get());
    }

    private Disposable subscribeCluster() {
        SharedPathString pathString = SharedPathString.of((String)"/_sys/storage-manager/notify");
        return this.eventBus.subscribe(Subscription.builder().subscriberId("event-bus-storage-listener").topics(new String[]{NOTIFY_TOPIC}).justBroker().build(), topicPayload -> {
            CacheNotify notify = (CacheNotify)topicPayload.decode(CacheNotify.class);
            return (Mono)Mono.fromRunnable(() -> this.handleNotify(notify)).as((Function)MonoTracer.create((CharSequence)pathString.append((CharSequence)notify.getName())));
        });
    }

    private void handleNotify(CacheNotify cacheNotify) {
        try {
            LocalCacheClusterConfigStorage storage = (LocalCacheClusterConfigStorage)this.cache.get(cacheNotify.getName());
            if (storage != null) {
                log.trace("clear local cache :{}", (Object)cacheNotify);
                storage.clearLocalCache(cacheNotify);
            } else {
                log.trace("ignore clear local cache :{}", (Object)cacheNotify);
            }
        }
        catch (Throwable error) {
            log.warn("clear local cache error", error);
        }
    }

    public void refreshAll() {
        for (Map.Entry entry : this.cache.entrySet()) {
            ((LocalCacheClusterConfigStorage)entry.getValue()).clearLocalCache(CacheNotify.expiresAll((String)entry.getKey()));
        }
    }

    public void dispose() {
        this.disposable.dispose();
    }

    void cleanup() {
        this.cache.forEach((key, value) -> {
            value.cleanup();
            if (value.isEmpty()) {
                this.cache.remove(key, value);
            }
        });
    }

    public Mono<ConfigStorage> getStorage(String id) {
        if (this.CLUSTER_SUBSCRIBER.compareAndSet(false, true)) {
            this.disposable.add(this.subscribeCluster());
        }
        return Mono.fromSupplier(() -> this.cache.computeIfAbsent(id, this.storageBuilder));
    }

    public void registerMbean() {
        try {
            MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
            ObjectName objectName = new ObjectName("org.jetlinks:type=ConfigStorage,name=LocalCacheClusterConfigStorage");
            mBeanServer.registerMBean(new StandardMBean(new ConfigStorageManagerMbeanImpl(), ConfigStorageManagerMbean.class), objectName);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    public void setFilterFactory(Function<String, StorageFilter> filterFactory) {
        this.filterFactory = filterFactory;
    }

    public static interface StorageFilter {
        public Set<String> getKeys();

        public Mono<Boolean> test(Map<String, Object> var1);
    }

    public static interface ConfigStorageManagerMbean {
        public long getCacheSize();

        public void cleanup();

        public void clean();

        public List<ConfigStorageInfo> search(String var1, int var2);
    }

    public static class ConfigStorageInfo {
        private String id;
        private String lastError;
        private Map<String, Object> values;

        public String getId() {
            return this.id;
        }

        public String getLastError() {
            return this.lastError;
        }

        public Map<String, Object> getValues() {
            return this.values;
        }

        public void setId(String id) {
            this.id = id;
        }

        public void setLastError(String lastError) {
            this.lastError = lastError;
        }

        public void setValues(Map<String, Object> values) {
            this.values = values;
        }
    }

    class ConfigStorageManagerMbeanImpl
    implements ConfigStorageManagerMbean {
        ConfigStorageManagerMbeanImpl() {
        }

        @Override
        public long getCacheSize() {
            return EventBusStorageManager.this.cache.size();
        }

        @Override
        public void cleanup() {
            EventBusStorageManager.this.cleanup();
        }

        @Override
        public void clean() {
            EventBusStorageManager.this.cache.clear();
        }

        @Override
        public List<ConfigStorageInfo> search(String expr, int maxSize) {
            StorageFilter filter = EventBusStorageManager.this.filterFactory == null ? null : (StringUtils.hasText((String)expr) ? (StorageFilter)EventBusStorageManager.this.filterFactory.apply(expr) : null);
            maxSize = Math.max(1, maxSize);
            if (filter == null) {
                return EventBusStorageManager.this.cache.entrySet().stream().limit(maxSize).map(e -> {
                    ConfigStorageInfo info = new ConfigStorageInfo();
                    info.setId((String)e.getKey());
                    info.setValues(((LocalCacheClusterConfigStorage)e.getValue()).getAll());
                    return info;
                }).collect(Collectors.toList());
            }
            return (List)Flux.fromIterable(EventBusStorageManager.this.cache.entrySet()).flatMap(e -> ((LocalCacheClusterConfigStorage)e.getValue()).getConfigs(Collections2.filter(filter.getKeys(), id -> !id.equals("id"))).thenReturn(e), 32).publishOn(Schedulers.boundedElastic()).filterWhen(e -> filter.test((Map<String, Object>)new CompositeMap(Collections.singletonMap("id", e.getKey()), ((LocalCacheClusterConfigStorage)e.getValue()).getAll())), 32).take((long)maxSize).map(e -> {
                ConfigStorageInfo info = new ConfigStorageInfo();
                info.setId((String)e.getKey());
                Throwable err = ((LocalCacheClusterConfigStorage)e.getValue()).lastError;
                if (err != null) {
                    StringWriter writer = new StringWriter();
                    err.printStackTrace(new PrintWriter(writer));
                    info.lastError = writer.toString();
                }
                info.setValues(((LocalCacheClusterConfigStorage)e.getValue()).getAll());
                return info;
            }).collectList().block(Duration.ofSeconds(30L));
        }
    }
}

