/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.web.cache.supports;

import java.util.Arrays;
import java.util.Collections;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import org.hswebframework.web.cache.ReactiveCache;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedisReactiveCache<E>
implements ReactiveCache<E> {
    private static final Logger log = LoggerFactory.getLogger(RedisReactiveCache.class);
    private ReactiveRedisOperations<Object, Object> operations;
    private String redisKey;
    private ReactiveCache<E> localCache;
    private String topicName;

    public RedisReactiveCache(String redisKey, ReactiveRedisOperations<Object, Object> operations, ReactiveCache<E> localCache) {
        this.operations = operations;
        this.localCache = localCache;
        this.redisKey = redisKey;
        this.topicName = "_cache_changed:" + redisKey;
        operations.listenToChannel(new String[]{this.topicName}).map(ReactiveSubscription.Message::getMessage).cast(String.class).subscribe(s -> {
            if (s.equals("___all")) {
                localCache.clear().subscribe();
                return;
            }
            localCache.evict(s).subscribe();
        });
    }

    @Override
    public Flux<E> getFlux(Object key) {
        return this.localCache.getFlux(key).switchIfEmpty((Publisher)Flux.defer(() -> this.operations.opsForHash().get((Object)this.redisKey, key).flatMapIterable(r -> {
            if (r instanceof Iterable) {
                return (Iterable)r;
            }
            return Collections.singletonList(r);
        }).map(r -> r))).onErrorResume(err -> this.handleError((Throwable)err));
    }

    protected <T> Mono<T> handleError(Throwable error) {
        log.error(error.getMessage(), error);
        return Mono.empty();
    }

    @Override
    public Mono<E> getMono(Object key) {
        return this.localCache.getMono(key).switchIfEmpty(this.operations.opsForHash().get((Object)this.redisKey, key).map(v -> v).flatMap(r -> this.localCache.put(key, (Publisher<E>)Mono.just((Object)r)).thenReturn(r))).onErrorResume(err -> this.handleError((Throwable)err));
    }

    @Override
    public Mono<Void> put(Object key, Publisher<E> data) {
        if (data instanceof Mono) {
            return ((Mono)data).flatMap(r -> this.operations.opsForHash().put((Object)this.redisKey, key, r).then(this.localCache.put(key, data)).then(this.operations.convertAndSend(this.topicName, key))).then().onErrorResume(err -> this.handleError((Throwable)err));
        }
        if (data instanceof Flux) {
            return ((Flux)data).collectList().flatMap(r -> this.operations.opsForHash().put((Object)this.redisKey, key, r).then(this.localCache.put(key, data)).then(this.operations.convertAndSend(this.topicName, key))).then().onErrorResume(err -> this.handleError((Throwable)err));
        }
        return Mono.error((Throwable)new UnsupportedOperationException("unsupport publisher:" + data));
    }

    @Override
    public Mono<Void> evictAll(Iterable<?> key) {
        return this.operations.opsForHash().remove((Object)this.redisKey, StreamSupport.stream(key.spliterator(), false).toArray()).then(this.localCache.evictAll(key)).flatMap(nil -> Flux.fromIterable((Iterable)key).flatMap(k -> this.operations.convertAndSend(this.topicName, (Object)key)).then()).onErrorResume(err -> this.handleError((Throwable)err));
    }

    @Override
    public Flux<E> getAll(Object ... keys) {
        if (keys.length == 0) {
            return this.operations.opsForHash().values((Object)this.redisKey).map(r -> r);
        }
        return this.operations.opsForHash().multiGet((Object)this.redisKey, Arrays.asList(keys)).flatMapIterable(Function.identity()).map(r -> r).onErrorResume(err -> this.handleError((Throwable)err));
    }

    @Override
    public Mono<Void> evict(Object key) {
        return this.operations.opsForHash().remove((Object)this.redisKey, new Object[]{key}).then(this.localCache.evict(key)).then(this.operations.convertAndSend(this.topicName, key)).onErrorResume(err -> this.handleError((Throwable)err)).then();
    }

    @Override
    public Mono<Void> clear() {
        return this.operations.opsForHash().delete((Object)this.redisKey).then(this.localCache.clear()).then(this.operations.convertAndSend(this.topicName, (Object)"___all")).onErrorResume(err -> this.handleError((Throwable)err)).then();
    }
}

