package com.ifourthwall.kafka;

import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.util.CollectionUtils;

@KafkaListener(containerFactory = "MessageListenerContainer")
/* loaded from: input_file:BOOT-INF/lib/ifourthwall-kafka-1.1.0.jar:com/ifourthwall/kafka/KafkaDataConsumer.class */
public class KafkaDataConsumer implements AcknowledgingConsumerAwareMessageListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaDataConsumer.class);
    private List<DataHandler> dataHandlerList;

    public KafkaDataConsumer(List<DataHandler> list) {
        this.dataHandlerList = list;
    }

    @Override // org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener
    public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer) {
        log.info("kafka消费接受到数据:{}", consumerRecord.value());
        consumer.commitSync();
        try {
            if (!CollectionUtils.isEmpty(this.dataHandlerList)) {
                for (DataHandler dataHandler : this.dataHandlerList) {
                    if (dataHandler.judge(consumerRecord.topic())) {
                        dataHandler.handle(consumerRecord.value().toString());
                    }
                }
            }
            consumer.commitSync();
            log.info("处理数据成功");
        } catch (Exception e) {
            log.error("kafka消费数据失败，失败原因:", (Throwable) e);
        }
    }

    @Override // org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener, org.springframework.kafka.listener.GenericMessageListener
    public void onMessage(Object obj) {
    }
}
