/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.redis;

import java.util.concurrent.atomic.AtomicBoolean;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class RedisClusterTopic<T>
implements ClusterTopic<T> {
    private final String topicName;
    private final ReactiveRedisOperations<Object, T> operations;
    private final Sinks.Many<ClusterTopic.TopicMessage<T>> processor = Reactors.createMany((boolean)false);
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private Disposable disposable;

    public RedisClusterTopic(String topic, ReactiveRedisOperations<Object, T> operations) {
        this.topicName = topic;
        this.operations = operations;
    }

    private void doSubscribe() {
        if (this.subscribed.compareAndSet(false, true)) {
            this.disposable = this.operations.listenToPattern(new String[]{this.topicName}).subscribe(data -> {
                if (this.processor.currentSubscriberCount() == 0) {
                    if (this.subscribed.compareAndSet(true, false)) {
                        this.disposable.dispose();
                    }
                } else {
                    this.processor.emitNext((Object)new ClusterTopic.TopicMessage<T>((ReactiveSubscription.Message)data){
                        final /* synthetic */ ReactiveSubscription.Message val$data;
                        {
                            this.val$data = message;
                        }

                        public String getTopic() {
                            return (String)this.val$data.getChannel();
                        }

                        public T getMessage() {
                            return this.val$data.getMessage();
                        }
                    }, Reactors.emitFailureHandler());
                }
            });
        }
    }

    public Flux<ClusterTopic.TopicMessage<T>> subscribePattern() {
        return this.processor.asFlux().doOnSubscribe(r -> this.doSubscribe());
    }

    public Mono<Integer> publish(Publisher<? extends T> publisher) {
        return Flux.from(publisher).flatMap(data -> this.operations.convertAndSend(this.topicName, data)).last((Object)1L).map(Number::intValue);
    }
}

