/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri;

import io.netty.handler.codec.http2.Http2Headers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.serialize.MultipleSerialization;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.protocol.tri.DefaultMetadata;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus;
import org.apache.dubbo.rpc.protocol.tri.Metadata;
import org.apache.dubbo.rpc.protocol.tri.Stream;
import org.apache.dubbo.rpc.protocol.tri.TransportObserver;
import org.apache.dubbo.rpc.protocol.tri.TripleUtil;

public abstract class AbstractStream
implements Stream {
    public static final boolean ENABLE_ATTACHMENT_WRAP = Boolean.parseBoolean(ConfigUtils.getProperty("triple.attachment", "false"));
    protected static final String DUPLICATED_DATA = "Duplicated data";
    private static final List<Executor> CALLBACK_EXECUTORS = new ArrayList<Executor>(4);
    private final URL url;
    private final MultipleSerialization multipleSerialization;
    private final StreamObserver<Object> streamObserver;
    private final TransportObserver transportObserver;
    private final Executor executor;
    private ServiceDescriptor serviceDescriptor;
    private MethodDescriptor methodDescriptor;
    private String methodName;
    private Request request;
    private String serializeType;
    private StreamObserver<Object> streamSubscriber;
    private TransportObserver transportSubscriber;

    protected AbstractStream(URL url) {
        this(url, AbstractStream.allocateCallbackExecutor());
    }

    protected AbstractStream(URL url, Executor executor) {
        this.url = url;
        this.executor = executor;
        String value = url.getParameter("serialize.multiple", "default");
        this.multipleSerialization = ExtensionLoader.getExtensionLoader(MultipleSerialization.class).getExtension(value);
        this.streamObserver = this.createStreamObserver();
        this.transportObserver = this.createTransportObserver();
    }

    private static Executor allocateCallbackExecutor() {
        return CALLBACK_EXECUTORS.get(ThreadLocalRandom.current().nextInt(4));
    }

    public Request getRequest() {
        return this.request;
    }

    public AbstractStream request(Request request) {
        this.request = request;
        return this;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    @Override
    public void execute(Runnable runnable) {
        this.executor.execute(runnable);
    }

    public String getMethodName() {
        return this.methodName;
    }

    public AbstractStream methodName(String methodName) {
        this.methodName = methodName;
        return this;
    }

    public AbstractStream method(MethodDescriptor md) {
        this.methodDescriptor = md;
        return this;
    }

    protected abstract StreamObserver<Object> createStreamObserver();

    protected abstract TransportObserver createTransportObserver();

    public String getSerializeType() {
        return this.serializeType;
    }

    public AbstractStream serialize(String serializeType) {
        if (serializeType.equals("hessian4")) {
            serializeType = "hessian2";
        }
        this.serializeType = serializeType;
        return this;
    }

    public MultipleSerialization getMultipleSerialization() {
        return this.multipleSerialization;
    }

    public StreamObserver<Object> getStreamSubscriber() {
        return this.streamSubscriber;
    }

    public TransportObserver getTransportSubscriber() {
        return this.transportSubscriber;
    }

    public MethodDescriptor getMethodDescriptor() {
        return this.methodDescriptor;
    }

    public ServiceDescriptor getServiceDescriptor() {
        return this.serviceDescriptor;
    }

    public void setServiceDescriptor(ServiceDescriptor serviceDescriptor) {
        this.serviceDescriptor = serviceDescriptor;
    }

    public URL getUrl() {
        return this.url;
    }

    @Override
    public void subscribe(StreamObserver<Object> observer) {
        this.streamSubscriber = observer;
    }

    @Override
    public void subscribe(TransportObserver observer) {
        this.transportSubscriber = observer;
    }

    @Override
    public StreamObserver<Object> asStreamObserver() {
        return this.streamObserver;
    }

    @Override
    public TransportObserver asTransportObserver() {
        return this.transportObserver;
    }

    protected void transportError(GrpcStatus status) {
        DefaultMetadata metadata = new DefaultMetadata();
        metadata.put("grpc-status", Integer.toString(status.code.code));
        metadata.put("grpc-message", status.toMessage());
        this.getTransportSubscriber().tryOnMetadata(metadata, true);
        if (LOGGER.isErrorEnabled()) {
            LOGGER.error("[Triple-Server-Error] " + status.toMessage());
        }
    }

    protected void transportError(Throwable throwable) {
        DefaultMetadata metadata = new DefaultMetadata();
        metadata.put("grpc-status", Integer.toString(GrpcStatus.Code.UNKNOWN.code));
        metadata.put("grpc-message", throwable.getMessage());
        this.getTransportSubscriber().tryOnMetadata(metadata, true);
        if (LOGGER.isErrorEnabled()) {
            LOGGER.error("[Triple-Server-Error] service=" + this.getServiceDescriptor().getServiceName() + " method=" + this.getMethodName(), throwable);
        }
    }

    protected Map<String, Object> parseMetadataToMap(Metadata metadata) {
        LinkedHashMap<String, Object> attachments = new LinkedHashMap<String, Object>();
        for (Map.Entry header : metadata) {
            String key = ((CharSequence)header.getKey()).toString();
            if (Http2Headers.PseudoHeaderName.isPseudoHeader((CharSequence)key)) continue;
            if (ENABLE_ATTACHMENT_WRAP && key.endsWith("-tw-bin") && key.length() > 7) {
                try {
                    attachments.put(key.substring(0, key.length() - 7), TripleUtil.decodeObjFromHeader(this.url, (CharSequence)header.getValue(), this.multipleSerialization));
                }
                catch (Exception e) {
                    LOGGER.error("Failed to parse response attachment key=" + key, e);
                }
            }
            if (key.endsWith("-bin") && key.length() > 4) {
                try {
                    attachments.put(key.substring(0, key.length() - 4), TripleUtil.decodeASCIIByte((CharSequence)header.getValue()));
                }
                catch (Exception e) {
                    LOGGER.error("Failed to parse response attachment key=" + key, e);
                }
                continue;
            }
            attachments.put(key, ((CharSequence)header.getValue()).toString());
        }
        return attachments;
    }

    protected void convertAttachment(Metadata metadata, Map<String, Object> attachments) {
        for (Map.Entry<String, Object> entry : attachments.entrySet()) {
            String key = entry.getKey().toLowerCase(Locale.ROOT);
            if (Http2Headers.PseudoHeaderName.isPseudoHeader((CharSequence)key)) continue;
            Object v = entry.getValue();
            this.convertSingleAttachment(metadata, key, v);
        }
    }

    private void convertSingleAttachment(Metadata metadata, String key, Object v) {
        try {
            if (!ENABLE_ATTACHMENT_WRAP) {
                if (v instanceof String) {
                    metadata.put(key, (String)v);
                } else if (v instanceof byte[]) {
                    metadata.put(key + "-bin", TripleUtil.encodeBase64ASCII((byte[])v));
                }
            } else if (v instanceof String || this.serializeType == null) {
                metadata.put(key, v.toString());
            } else {
                String encoded = TripleUtil.encodeWrapper(this.url, v, this.serializeType, this.getMultipleSerialization());
                metadata.put(key + "-tw-bin", encoded);
            }
        }
        catch (IOException e) {
            LOGGER.warn("Meet exception when convert single attachment key:" + key, e);
        }
    }

    static {
        NamedInternalThreadFactory tripleTF = new NamedInternalThreadFactory("tri-callbcak", true);
        for (int i = 0; i < 4; ++i) {
            ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(1024), tripleTF, new ThreadPoolExecutor.AbortPolicy());
            CALLBACK_EXECUTORS.add(tp);
        }
    }

    protected static abstract class UnaryTransportObserver
    extends AbstractTransportObserver
    implements TransportObserver {
        private byte[] data;

        protected UnaryTransportObserver() {
        }

        public byte[] getData() {
            return this.data;
        }

        protected abstract void onError(GrpcStatus var1);

        @Override
        public void onComplete(Stream.OperationHandler handler) {
            Metadata metadata = this.getTrailers() == null ? this.getHeaders() : this.getTrailers();
            GrpcStatus status = this.extractStatusFromMeta(metadata);
            if (GrpcStatus.Code.isOk(status.code.code)) {
                this.doOnComplete(handler);
            } else {
                this.onError(status);
            }
        }

        protected abstract void doOnComplete(Stream.OperationHandler var1);

        @Override
        public void onData(byte[] in, boolean endStream, Stream.OperationHandler handler) {
            if (this.data == null) {
                this.data = in;
            } else {
                handler.operationDone(Stream.OperationResult.FAILURE, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL).withDescription(AbstractStream.DUPLICATED_DATA).asException());
            }
        }
    }

    protected static abstract class AbstractTransportObserver
    implements TransportObserver {
        private Metadata headers;
        private Metadata trailers;

        protected AbstractTransportObserver() {
        }

        public Metadata getHeaders() {
            return this.headers;
        }

        public Metadata getTrailers() {
            return this.trailers;
        }

        @Override
        public void onMetadata(Metadata metadata, boolean endStream, Stream.OperationHandler handler) {
            if (this.headers == null) {
                this.headers = metadata;
            } else {
                this.trailers = metadata;
            }
        }

        protected GrpcStatus extractStatusFromMeta(Metadata metadata) {
            if (metadata.contains("grpc-status")) {
                int code = Integer.parseInt(metadata.get("grpc-status").toString());
                if (!GrpcStatus.Code.isOk(code)) {
                    GrpcStatus status = GrpcStatus.fromCode(code);
                    if (metadata.contains("grpc-message")) {
                        String raw = metadata.get("grpc-message").toString();
                        status = status.withDescription(GrpcStatus.fromMessage(raw));
                    }
                    return status;
                }
                return GrpcStatus.fromCode(GrpcStatus.Code.OK);
            }
            return GrpcStatus.fromCode(GrpcStatus.Code.OK);
        }
    }
}

