/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.source;

import java.security.Provider;
import java.security.Security;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.PulsarSourceConsumerConfig;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.io.core.Source;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

public abstract class PulsarSource<T>
implements Source<T> {
    protected final PulsarClient pulsarClient;
    protected final PulsarSourceConfig pulsarSourceConfig;
    protected final Map<String, String> properties;
    protected final ClassLoader functionClassLoader;
    protected final TopicSchema topicSchema;

    protected PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarSourceConfig, Map<String, String> properties, ClassLoader functionClassLoader) {
        this.pulsarClient = pulsarClient;
        this.pulsarSourceConfig = pulsarSourceConfig;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.properties = properties;
        this.functionClassLoader = functionClassLoader;
    }

    public abstract List<Consumer<T>> getInputConsumers();

    protected ConsumerBuilder<T> createConsumeBuilder(String topic, PulsarSourceConsumerConfig conf) {
        ConsumerBuilder cb = this.pulsarClient.newConsumer(conf.getSchema()).subscriptionName(this.pulsarSourceConfig.getSubscriptionName()).subscriptionInitialPosition(this.pulsarSourceConfig.getSubscriptionPosition()).subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
        if (conf.getConsumerProperties() != null && !conf.getConsumerProperties().isEmpty()) {
            cb.loadConf(new HashMap<String, String>(conf.getConsumerProperties()));
        }
        cb = conf.isRegexPattern() ? cb.topicsPattern(topic) : cb.topics(Collections.singletonList(topic));
        if (conf.getReceiverQueueSize() != null) {
            cb = cb.receiverQueueSize(conf.getReceiverQueueSize().intValue());
        }
        if (conf.getCryptoKeyReader() != null) {
            cb = cb.cryptoKeyReader(conf.getCryptoKeyReader());
        }
        if (conf.getConsumerCryptoFailureAction() != null) {
            cb = cb.cryptoFailureAction(conf.getConsumerCryptoFailureAction());
        }
        cb = cb.properties(this.properties);
        if (this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() != null && this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() > 0L) {
            cb.negativeAckRedeliveryDelay(this.pulsarSourceConfig.getNegativeAckRedeliveryDelayMs().longValue(), TimeUnit.MILLISECONDS);
        }
        if (this.pulsarSourceConfig.getTimeoutMs() != null) {
            cb = cb.ackTimeout(this.pulsarSourceConfig.getTimeoutMs().longValue(), TimeUnit.MILLISECONDS);
        }
        if (this.pulsarSourceConfig.getMaxMessageRetries() != null && this.pulsarSourceConfig.getMaxMessageRetries() >= 0) {
            DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder();
            deadLetterPolicyBuilder.maxRedeliverCount(this.pulsarSourceConfig.getMaxMessageRetries().intValue());
            if (this.pulsarSourceConfig.getDeadLetterTopic() != null && !this.pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
                deadLetterPolicyBuilder.deadLetterTopic(this.pulsarSourceConfig.getDeadLetterTopic());
            }
            cb = cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
        }
        if (conf.isPoolMessages()) {
            cb.poolMessages(true);
        }
        return cb;
    }

    protected Record<T> buildRecord(Consumer<T> consumer, Message<T> message) {
        MessageImpl impl;
        Schema schema = null;
        if (message instanceof MessageImpl) {
            impl = (MessageImpl)message;
            schema = impl.getSchemaInternal();
        } else if (message instanceof TopicMessageImpl) {
            impl = (TopicMessageImpl)message;
            schema = impl.getSchemaInternal();
        }
        if (schema instanceof AutoConsumeSchema) {
            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema)schema;
            schema = autoConsumeSchema.unwrapInternalSchema(message.getSchemaVersion());
        }
        return PulsarRecord.builder().message(message).schema(schema).topicName(message.getTopicName()).ackFunction(() -> {
            try {
                if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                    consumer.acknowledgeCumulativeAsync(message);
                } else {
                    consumer.acknowledgeAsync(message);
                }
            }
            finally {
                message.release();
            }
        }).failFunction(() -> {
            try {
                if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                    throw new RuntimeException("Failed to process message: " + message.getMessageId());
                }
                consumer.negativeAcknowledge(message);
            }
            finally {
                message.release();
            }
        }).build();
    }

    protected PulsarSourceConsumerConfig<T> buildPulsarSourceConsumerConfig(String topic, ConsumerConfig conf, Class<?> typeArg) {
        PulsarSourceConsumerConfig.PulsarSourceConsumerConfigBuilder<?> consumerConfBuilder = PulsarSourceConsumerConfig.builder().isRegexPattern(conf.isRegexPattern()).receiverQueueSize(conf.getReceiverQueueSize()).consumerProperties(conf.getConsumerProperties());
        Schema<?> schema = conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty() ? this.topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true) : this.topicSchema.getSchema(topic, typeArg, conf, true);
        consumerConfBuilder.schema(schema);
        if (conf.getCryptoConfig() != null) {
            if (Security.getProvider("BC") == null) {
                Security.addProvider((Provider)new BouncyCastleProvider());
            }
            consumerConfBuilder.consumerCryptoFailureAction(conf.getCryptoConfig().getConsumerCryptoFailureAction());
            consumerConfBuilder.cryptoKeyReader(CryptoUtils.getCryptoKeyReaderInstance((String)conf.getCryptoConfig().getCryptoKeyReaderClassName(), (Map)conf.getCryptoConfig().getCryptoKeyReaderConfig(), (ClassLoader)this.functionClassLoader));
        }
        consumerConfBuilder.poolMessages(conf.isPoolMessages());
        return consumerConfBuilder.build();
    }
}

