/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import java.lang.reflect.Type;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.EndStreamQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
import org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder;
import org.apache.dubbo.rpc.protocol.tri.stream.AbstractStream;
import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
import org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleCommandOutBoundHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;

public class TripleClientStream
extends AbstractStream
implements ClientStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(TripleClientStream.class);
    public final ClientStream.Listener listener;
    private final WriteQueue writeQueue;
    private Deframer deframer;
    private final Channel parent;

    TripleClientStream(FrameworkModel frameworkModel, Executor executor, WriteQueue writeQueue, ClientStream.Listener listener) {
        super(executor, frameworkModel);
        this.parent = null;
        this.listener = listener;
        this.writeQueue = writeQueue;
    }

    public TripleClientStream(FrameworkModel frameworkModel, Executor executor, Channel parent, ClientStream.Listener listener) {
        super(executor, frameworkModel);
        this.parent = parent;
        this.listener = listener;
        this.writeQueue = this.createWriteQueue(parent);
    }

    private WriteQueue createWriteQueue(Channel parent) {
        Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
        Future future = bootstrap.open().syncUninterruptibly();
        if (!future.isSuccess()) {
            throw new IllegalStateException("Create remote stream failed. channel:" + parent);
        }
        Http2StreamChannel channel = (Http2StreamChannel)future.getNow();
        channel.pipeline().addLast(new ChannelHandler[]{new TripleCommandOutBoundHandler()}).addLast(new ChannelHandler[]{new TripleHttp2ClientResponseHandler(this.createTransportListener())});
        channel.closeFuture().addListener(f -> this.transportException(f.cause()));
        return new WriteQueue((Channel)channel);
    }

    public void close() {
        this.writeQueue.close();
    }

    public ChannelFuture sendHeader(Http2Headers headers) {
        if (this.writeQueue == null) {
            return this.parent.newFailedFuture((Throwable)new IllegalStateException("Stream already closed"));
        }
        HeaderQueueCommand headerCmd = HeaderQueueCommand.createHeaders(headers);
        return this.writeQueue.enqueue(headerCmd).addListener(future -> {
            if (!future.isSuccess()) {
                this.transportException(future.cause());
            }
        });
    }

    private void transportException(Throwable cause) {
        TriRpcStatus status = TriRpcStatus.INTERNAL.withDescription("Http2 exception").withCause(cause);
        this.listener.onComplete(status, null);
    }

    public ChannelFuture cancelByLocal(TriRpcStatus status) {
        CancelQueueCommand cmd = CancelQueueCommand.createCommand(Http2Error.CANCEL);
        return this.writeQueue.enqueue(cmd, true);
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.parent.remoteAddress();
    }

    public ChannelFuture sendMessage(byte[] message, int compressFlag, boolean eos) {
        DataQueueCommand cmd = DataQueueCommand.createGrpcCommand(message, false, compressFlag);
        return this.writeQueue.enqueue(cmd).addListener(future -> {
            if (!future.isSuccess()) {
                this.cancelByLocal(TriRpcStatus.INTERNAL.withDescription("Client write message failed").withCause(future.cause()));
                this.transportException(future.cause());
            }
        });
    }

    @Override
    public void request(int n) {
        this.deframer.request(n);
    }

    public ChannelFuture halfClose() {
        EndStreamQueueCommand cmd = EndStreamQueueCommand.create();
        return this.writeQueue.enqueue(cmd);
    }

    H2TransportListener createTransportListener() {
        return new ClientTransportListener();
    }

    class ClientTransportListener
    extends AbstractH2TransportListener
    implements H2TransportListener {
        private TriRpcStatus transportError;
        private DeCompressor decompressor;
        private boolean halfClosed;
        private boolean headerReceived;
        private Http2Headers trailers;

        ClientTransportListener() {
        }

        void handleH2TransportError(TriRpcStatus status) {
            TripleClientStream.this.writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.NO_ERROR), true);
            this.finishProcess(status, null);
        }

        void finishProcess(TriRpcStatus status, Http2Headers trailers) {
            if (this.halfClosed) {
                return;
            }
            this.halfClosed = true;
            Map<String, String> reserved = this.filterReservedHeaders(trailers);
            Map<String, Object> attachments = this.headersToMap(trailers);
            Map<String, Object> finalAttachments = this.convertNoLowerCaseHeader(attachments);
            TripleClientStream.this.listener.onComplete(status, finalAttachments, reserved);
        }

        private Map<String, Object> convertNoLowerCaseHeader(Map<String, Object> attachments) {
            Object obj = attachments.remove(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
            if (obj == null) {
                return attachments;
            }
            if (obj instanceof String) {
                String json = TriRpcStatus.decodeMessage((String)obj);
                Map map = (Map)JsonUtils.getJson().toJavaObject(json, (Type)((Object)Map.class));
                map.forEach((originalKey, lowerCaseKey) -> {
                    Object val = attachments.remove(lowerCaseKey);
                    if (val != null) {
                        attachments.put((String)originalKey, val);
                    }
                });
            } else {
                LOGGER.error("Triple convertNoLowerCaseHeader error, obj is not String");
            }
            return attachments;
        }

        private TriRpcStatus validateHeaderStatus(Http2Headers headers) {
            Integer httpStatus;
            Integer n = httpStatus = headers.status() == null ? null : Integer.valueOf(Integer.parseInt(headers.status().toString()));
            if (httpStatus == null) {
                return TriRpcStatus.INTERNAL.withDescription("Missing HTTP status code");
            }
            CharSequence contentType = (CharSequence)headers.get((Object)TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader());
            if (contentType == null || !contentType.toString().startsWith(TripleHeaderEnum.APPLICATION_GRPC.getHeader())) {
                return TriRpcStatus.fromCode(TriRpcStatus.httpStatusToGrpcCode(httpStatus)).withDescription("invalid content-type: " + contentType);
            }
            return null;
        }

        void onHeaderReceived(Http2Headers headers) {
            Integer httpStatus;
            if (this.transportError != null) {
                this.transportError.appendDescription("headers:" + headers);
                return;
            }
            if (this.headerReceived) {
                this.transportError = TriRpcStatus.INTERNAL.withDescription("Received headers twice");
                return;
            }
            Integer n = httpStatus = headers.status() == null ? null : Integer.valueOf(Integer.parseInt(headers.status().toString()));
            if (httpStatus != null && Integer.parseInt(httpStatus.toString()) > 100 && httpStatus < 200) {
                return;
            }
            this.headerReceived = true;
            this.transportError = this.validateHeaderStatus(headers);
            CharSequence messageEncoding = (CharSequence)headers.get((Object)TripleHeaderEnum.GRPC_ENCODING.getHeader());
            if (null != messageEncoding) {
                String compressorStr = messageEncoding.toString();
                if (!Identity.IDENTITY.getMessageEncoding().equals(compressorStr)) {
                    DeCompressor compressor = DeCompressor.getCompressor(TripleClientStream.this.frameworkModel, compressorStr);
                    if (null == compressor) {
                        throw TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)).asException();
                    }
                    this.decompressor = compressor;
                }
            }
            TriDecoder.Listener listener = new TriDecoder.Listener(){

                @Override
                public void onRawMessage(byte[] data) {
                    TripleClientStream.this.listener.onMessage(data);
                }

                @Override
                public void close() {
                    ClientTransportListener.this.finishProcess(ClientTransportListener.this.statusFromTrailers(ClientTransportListener.this.trailers), ClientTransportListener.this.trailers);
                }
            };
            TripleClientStream.this.deframer = new TriDecoder(this.decompressor, listener);
            TripleClientStream.this.listener.onStart();
        }

        void onTrailersReceived(Http2Headers trailers) {
            if (this.transportError == null && !this.headerReceived) {
                this.transportError = this.validateHeaderStatus(trailers);
            }
            if (this.transportError != null) {
                this.transportError = this.transportError.appendDescription("trailers: " + trailers);
            } else {
                this.trailers = trailers;
                TriRpcStatus status = this.statusFromTrailers(trailers);
                if (TripleClientStream.this.deframer == null) {
                    this.finishProcess(status, trailers);
                }
                if (TripleClientStream.this.deframer != null) {
                    TripleClientStream.this.deframer.close();
                }
            }
        }

        private TriRpcStatus statusFromTrailers(Http2Headers trailers) {
            TriRpcStatus status;
            Integer intStatus = trailers.getInt((Object)TripleHeaderEnum.STATUS_KEY.getHeader());
            TriRpcStatus triRpcStatus = status = intStatus == null ? null : TriRpcStatus.fromCode(intStatus);
            if (status != null) {
                CharSequence message = (CharSequence)trailers.get((Object)TripleHeaderEnum.MESSAGE_KEY.getHeader());
                if (message != null) {
                    String description = TriRpcStatus.decodeMessage(message.toString());
                    status = status.withDescription(description);
                }
                return status;
            }
            if (this.headerReceived) {
                return TriRpcStatus.UNKNOWN.withDescription("missing GRPC status in response");
            }
            Integer httpStatus = trailers.status() == null ? null : Integer.valueOf(Integer.parseInt(trailers.status().toString()));
            status = httpStatus != null ? TriRpcStatus.fromCode(TriRpcStatus.httpStatusToGrpcCode(httpStatus)) : TriRpcStatus.INTERNAL.withDescription("missing HTTP status code");
            return status.appendDescription("missing GRPC status, inferred error from HTTP status code");
        }

        @Override
        public void onHeader(Http2Headers headers, boolean endStream) {
            TripleClientStream.this.executor.execute(() -> {
                if (endStream) {
                    if (!this.halfClosed) {
                        TripleClientStream.this.writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL), true);
                    }
                    this.onTrailersReceived(headers);
                } else {
                    this.onHeaderReceived(headers);
                }
            });
        }

        @Override
        public void onData(ByteBuf data, boolean endStream) {
            TripleClientStream.this.executor.execute(() -> {
                if (this.transportError != null) {
                    this.transportError.appendDescription("Data:" + data.toString(StandardCharsets.UTF_8));
                    ReferenceCountUtil.release((Object)data);
                    if (this.transportError.description.length() > 512 || endStream) {
                        this.handleH2TransportError(this.transportError);
                    }
                    return;
                }
                if (!this.headerReceived) {
                    this.handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not received before payload"));
                    return;
                }
                TripleClientStream.this.deframer.deframe(data);
            });
        }

        @Override
        public void cancelByRemote(long errorCode) {
            TripleClientStream.this.executor.execute(() -> {
                this.transportError = TriRpcStatus.CANCELLED.withDescription("Canceled by remote peer, errorCode=" + errorCode);
                this.finishProcess(this.transportError, null);
            });
        }
    }
}

