/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.remoting.rpc;

import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventHandler;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.ConnectionFactory;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.ProtocolCode;
import com.alipay.remoting.SystemProperties;
import com.alipay.remoting.Url;
import com.alipay.remoting.codec.ProtocolCodeBasedEncoder;
import com.alipay.remoting.log.BoltLoggerFactory;
import com.alipay.remoting.rpc.HeartbeatHandler;
import com.alipay.remoting.rpc.RpcHandler;
import com.alipay.remoting.rpc.protocol.RpcProtocolDecoder;
import com.alipay.remoting.rpc.protocol.UserProcessor;
import com.alipay.remoting.util.StringUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

public class RpcConnectionFactory
implements ConnectionFactory {
    private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1, (ThreadFactory)new NamedThreadFactory("Rpc-netty-client-worker"));
    private Bootstrap bootstrap;
    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap(4);

    @Override
    public void init(final ConnectionEventHandler connectionEventHandler) {
        this.bootstrap = new Bootstrap();
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(workerGroup)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, (Object)SystemProperties.tcp_nodelay())).option(ChannelOption.SO_REUSEADDR, (Object)SystemProperties.tcp_so_reuseaddr())).option(ChannelOption.SO_KEEPALIVE, (Object)SystemProperties.tcp_so_keepalive());
        this.initWriteBufferWaterMark();
        boolean pooledBuffer = SystemProperties.netty_buffer_pooled();
        if (pooledBuffer) {
            this.bootstrap.option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        }
        final boolean idleSwitch = SystemProperties.tcp_idle_switch();
        final int idleTime = SystemProperties.tcp_idle();
        final RpcHandler rpcHandler = new RpcHandler(this.userProcessors);
        final HeartbeatHandler heartbeatHandler = new HeartbeatHandler();
        this.bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast("decoder", (ChannelHandler)new RpcProtocolDecoder(1));
                pipeline.addLast("encoder", (ChannelHandler)new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(2)));
                if (idleSwitch) {
                    pipeline.addLast("idleStateHandler", (ChannelHandler)new IdleStateHandler((long)idleTime, (long)idleTime, 0L, TimeUnit.MILLISECONDS));
                    pipeline.addLast("heartbeatHandler", (ChannelHandler)heartbeatHandler);
                }
                pipeline.addLast("connectionEventHandler", (ChannelHandler)connectionEventHandler);
                pipeline.addLast("handler", (ChannelHandler)rpcHandler);
            }
        });
    }

    @Override
    public Connection createConnection(Url url) throws Exception {
        ChannelFuture future = this.doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
        Connection conn = new Connection(future.channel(), ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
        future.channel().pipeline().fireUserEventTriggered((Object)ConnectionEventType.CONNECT);
        return conn;
    }

    @Override
    public Connection createConnection(String targetIP, int targetPort, int connectTimeout) throws Exception {
        ChannelFuture future = this.doCreateConnection(targetIP, targetPort, connectTimeout);
        Connection conn = new Connection(future.channel(), ProtocolCode.fromBytes(1), 1, new Url(targetIP, targetPort));
        future.channel().pipeline().fireUserEventTriggered((Object)ConnectionEventType.CONNECT);
        return conn;
    }

    @Override
    public Connection createConnection(String targetIP, int targetPort, byte version, int connectTimeout) throws Exception {
        ChannelFuture future = this.doCreateConnection(targetIP, targetPort, connectTimeout);
        Connection conn = new Connection(future.channel(), ProtocolCode.fromBytes(2), version, new Url(targetIP, targetPort));
        future.channel().pipeline().fireUserEventTriggered((Object)ConnectionEventType.CONNECT);
        return conn;
    }

    protected ChannelFuture doCreateConnection(String targetIP, int targetPort, int connectTimeout) throws Exception {
        connectTimeout = Math.max(connectTimeout, 1000);
        String addr = targetIP + ":" + targetPort;
        if (logger.isDebugEnabled()) {
            logger.debug("connectTimeout of address [{}] is [{}].", (Object)addr, (Object)connectTimeout);
        }
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)connectTimeout);
        ChannelFuture future = this.bootstrap.connect((SocketAddress)new InetSocketAddress(targetIP, targetPort));
        future.awaitUninterruptibly();
        if (!future.isDone()) {
            String errMsg = "Create connection to " + addr + " timeout!";
            logger.warn(errMsg);
            throw new Exception(errMsg);
        }
        if (future.isCancelled()) {
            String errMsg = "Create connection to " + addr + " cancelled by user!";
            logger.warn(errMsg);
            throw new Exception(errMsg);
        }
        if (!future.isSuccess()) {
            String errMsg = "Create connection to " + addr + " error!";
            logger.warn(errMsg);
            throw new Exception(errMsg, future.cause());
        }
        return future;
    }

    @Override
    public void registerUserProcessor(UserProcessor<?> processor) {
        if (processor == null || StringUtils.isBlank(processor.interest())) {
            throw new RuntimeException("User processor or processor interest should not be blank!");
        }
        UserProcessor<?> preProcessor = this.userProcessors.putIfAbsent(processor.interest(), processor);
        if (preProcessor != null) {
            String errMsg = "Processor with interest key [" + processor.interest() + "] has already been registered to rpc client, can not register again!";
            throw new RuntimeException(errMsg);
        }
    }

    private void initWriteBufferWaterMark() {
        int highWaterMark;
        int lowWaterMark = SystemProperties.netty_buffer_low_watermark();
        if (lowWaterMark > (highWaterMark = SystemProperties.netty_buffer_high_watermark())) {
            throw new IllegalArgumentException(String.format("[client side] bolt netty high water mark {%s} should not be smaller than low water mark {%s} bytes)", highWaterMark, lowWaterMark));
        }
        logger.warn("[client side] bolt netty low water mark is {} bytes, high water mark is {} bytes", (Object)lowWaterMark, (Object)highWaterMark);
        this.bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(lowWaterMark, highWaterMark));
    }
}

