/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.rabbitmq;

import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.rabbitmq.JbootmqRabbitmqConfig;
import io.jboot.exception.JbootException;
import io.jboot.utils.StrUtil;
import java.io.IOException;
import java.util.Map;

public class JbootRabbitmqImpl
extends JbootmqBase
implements Jbootmq {
    private Connection connection;
    private Map<String, Channel> channelMap = Maps.newConcurrentMap();

    public JbootRabbitmqImpl() {
        JbootmqRabbitmqConfig rabbitmqConfig = Jboot.config(JbootmqRabbitmqConfig.class);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(rabbitmqConfig.getHost());
        factory.setPort(rabbitmqConfig.getPort());
        if (StrUtil.isNotBlank(rabbitmqConfig.getVirtualHost())) {
            factory.setVirtualHost(rabbitmqConfig.getVirtualHost());
        }
        if (StrUtil.isNotBlank(rabbitmqConfig.getUsername())) {
            factory.setUsername(rabbitmqConfig.getUsername());
        }
        if (StrUtil.isNotBlank(rabbitmqConfig.getPassword())) {
            factory.setPassword(rabbitmqConfig.getPassword());
        }
        try {
            this.connection = factory.newConnection();
        }
        catch (Exception e) {
            throw new JbootException("can not connection rabbitmq server", e);
        }
    }

    @Override
    protected void onStartListening() {
        for (String toChannel : this.channels) {
            this.registerListner(this.getChannel(toChannel), toChannel);
        }
    }

    private void registerListner(Channel channel, String toChannel) {
        if (channel == null) {
            return;
        }
        try {
            channel.basicConsume("", true, (Consumer)new DefaultConsumer(channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    Object o = JbootRabbitmqImpl.this.getSerializer().deserialize(body);
                    JbootRabbitmqImpl.this.notifyListeners(envelope.getExchange(), o);
                }
            });
            channel.basicConsume(toChannel, true, (Consumer)new DefaultConsumer(channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    Object o = JbootRabbitmqImpl.this.getSerializer().deserialize(body);
                    JbootRabbitmqImpl.this.notifyListeners(envelope.getRoutingKey(), o);
                }
            });
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    private Channel getChannel(String toChannel) {
        Channel channel = this.channelMap.get(toChannel);
        if (channel == null) {
            try {
                channel = this.connection.createChannel();
                channel.queueDeclare(toChannel, false, false, false, null);
                channel.exchangeDeclare(toChannel, BuiltinExchangeType.FANOUT);
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, toChannel, toChannel);
            }
            catch (IOException e) {
                throw new JbootException("can not createChannel", e);
            }
            if (channel != null) {
                this.channelMap.put(toChannel, channel);
            }
        }
        return channel;
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        Channel channel = this.getChannel(toChannel);
        try {
            byte[] bytes = this.getSerializer().serialize(message);
            channel.basicPublish("", toChannel, MessageProperties.BASIC, bytes);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void publish(Object message, String toChannel) {
        Channel channel = this.getChannel(toChannel);
        try {
            byte[] bytes = this.getSerializer().serialize(message);
            channel.basicPublish(toChannel, "", MessageProperties.BASIC, bytes);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }
}

