/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.redismq;

import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.redismq.JbootmqRedisConfig;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.support.redis.JbootRedis;
import io.jboot.support.redis.JbootRedisManager;
import redis.clients.jedis.BinaryJedisPubSub;

public class JbootRedismqImpl
extends JbootmqBase
implements Jbootmq,
Runnable {
    private static final Log LOG = Log.getLog(JbootRedismqImpl.class);
    private JbootRedis redis;
    private Thread dequeueThread;

    public JbootRedismqImpl() {
        JbootmqRedisConfig redisConfig = Jboot.config(JbootmqRedisConfig.class);
        this.redis = redisConfig.isConfigOk() ? JbootRedisManager.me().getRedis(redisConfig) : Jboot.getRedis();
        if (this.redis == null) {
            throw new JbootIllegalConfigException("can not use redis mq (redis mq is default), please config jboot.redis.host=yourhost or use other mq component. ");
        }
    }

    @Override
    protected void onStartListening() {
        Object[] channels = this.channels.toArray(new String[0]);
        this.redis.subscribe(new BinaryJedisPubSub(){

            public void onMessage(byte[] channel, byte[] message) {
                JbootRedismqImpl.this.notifyListeners(JbootRedismqImpl.this.redis.bytesToKey(channel), JbootRedismqImpl.this.getSerializer().deserialize(message));
            }
        }, this.redis.keysToBytesArray(channels));
        this.dequeueThread = new Thread(this);
        this.dequeueThread.start();
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        this.redis.lpush(toChannel, message);
    }

    @Override
    public void publish(Object message, String toChannel) {
        this.redis.publish(this.redis.keyToBytes(toChannel), this.getSerializer().serialize(message));
    }

    @Override
    public void run() {
        while (true) {
            try {
                while (true) {
                    this.doExecuteDequeue();
                    Thread.sleep(100L);
                }
            }
            catch (Throwable ex) {
                LOG.error(ex.toString(), ex);
                continue;
            }
            break;
        }
    }

    private void doExecuteDequeue() {
        for (String channel : this.channels) {
            Object data = this.redis.lpop(channel);
            if (data == null) continue;
            this.notifyListeners(channel, data);
        }
    }
}

