/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.zbus;

import com.google.common.collect.Maps;
import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.zbus.JbootZbusmqConfig;
import io.jboot.utils.StrUtil;
import io.zbus.mq.Broker;
import io.zbus.mq.ConsumeGroup;
import io.zbus.mq.Consumer;
import io.zbus.mq.ConsumerConfig;
import io.zbus.mq.Message;
import io.zbus.mq.MessageHandler;
import io.zbus.mq.MqClient;
import io.zbus.mq.Producer;
import java.io.IOException;
import java.util.Map;

public class JbootZbusmqImpl
extends JbootmqBase
implements Jbootmq,
MessageHandler {
    private static final Log LOG = Log.getLog(JbootZbusmqImpl.class);
    private Broker broker;
    JbootZbusmqConfig zbusmqConfig = Jboot.config(JbootZbusmqConfig.class);
    private Map<String, Producer> producerMap = Maps.newConcurrentMap();

    public JbootZbusmqImpl() {
        this.broker = new Broker(this.zbusmqConfig.getBroker());
    }

    @Override
    protected void onStartListening() {
        String[] queues;
        for (String channel : this.channels) {
            ConsumerConfig config = new ConsumerConfig(this.broker);
            config.setTopic(channel);
            config.setMessageHandler((MessageHandler)this);
            ConsumeGroup group = ConsumeGroup.createTempBroadcastGroup();
            config.setConsumeGroup(group);
            Consumer consumer = new Consumer(config);
            try {
                consumer.start();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
        String queueString = this.zbusmqConfig.getQueue();
        if (StrUtil.isBlank((String)queueString)) {
            return;
        }
        for (String channel : queues = queueString.split(",")) {
            ConsumerConfig config = new ConsumerConfig(this.broker);
            config.setTopic(channel);
            config.setMessageHandler((MessageHandler)this);
            Consumer consumer = new Consumer(config);
            try {
                consumer.start();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        this.publish(message, toChannel);
    }

    @Override
    public void publish(Object message, String toChannel) {
        Producer producer = this.getProducer(toChannel);
        Message msg = new Message();
        msg.setTopic(toChannel);
        msg.setBody(Jboot.getSerializer().serialize(message));
        try {
            producer.publish(msg);
        }
        catch (Exception e) {
            LOG.error(e.toString(), (Throwable)e);
        }
    }

    public Producer getProducer(String toChannel) {
        Producer producer = this.producerMap.get(toChannel);
        if (producer == null) {
            producer = new Producer(this.broker);
            try {
                producer.declareTopic(toChannel);
            }
            catch (Exception e) {
                LOG.error(e.toString(), (Throwable)e);
            }
            this.producerMap.put(toChannel, producer);
        }
        return producer;
    }

    public void handle(Message message, MqClient mqClient) throws IOException {
        this.notifyListeners(message.getTopic(), Jboot.getSerializer().deserialize(message.getBody()));
    }
}

