/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.util;

import java.time.Duration;
import org.reactivestreams.Publisher;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public final class IntegrationReactiveUtils {
    public static final String DELAY_WHEN_EMPTY_KEY = "DELAY_WHEN_EMPTY_KEY";
    public static final Duration DEFAULT_DELAY_WHEN_EMPTY = Duration.ofSeconds(1L);

    private IntegrationReactiveUtils() {
    }

    public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) {
        return Mono.create(monoSink -> monoSink.onRequest(value -> monoSink.success(messageSource.receive()))).doOnSuccess(message -> AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message))).doOnError(MessagingException.class, ex -> {
            Message failedMessage = ex.getFailedMessage();
            if (failedMessage != null) {
                AckUtils.autoNack(StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMessage));
            }
        }).subscribeOn(Schedulers.boundedElastic()).repeatWhenEmpty(repeat -> repeat.flatMap(increment -> Mono.subscriberContext().flatMap(ctx -> Mono.delay((Duration)((Duration)ctx.getOrDefault((Object)DELAY_WHEN_EMPTY_KEY, (Object)DEFAULT_DELAY_WHEN_EMPTY)))))).repeat().retry();
    }

    public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
        if (messageChannel instanceof Publisher) {
            return Flux.from((Publisher)((Publisher)messageChannel));
        }
        if (messageChannel instanceof SubscribableChannel) {
            return IntegrationReactiveUtils.adaptSubscribableChannelToPublisher((SubscribableChannel)messageChannel);
        }
        if (messageChannel instanceof PollableChannel) {
            return IntegrationReactiveUtils.messageSourceToFlux(() -> ((PollableChannel)messageChannel).receive(0L));
        }
        throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, SubscribableChannel or PollableChannel, not: " + messageChannel);
    }

    private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
        return Flux.defer(() -> {
            EmitterProcessor publisher = EmitterProcessor.create((int)1);
            MessageHandler messageHandler = message -> publisher.onNext((Object)message);
            inputChannel.subscribe(messageHandler);
            return publisher.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
        });
    }
}

