package org.jetlinks.core.trace;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/jetlinks/core/trace/TraceFlux.class */
public class TraceFlux<T> extends FluxOperator<T, T> {
    private static final Logger log = LoggerFactory.getLogger(TraceFlux.class);
    private final String spanName;
    private final Tracer tracer;
    private final BiConsumer<Span, T> onNext;
    private final BiConsumer<Span, Long> onComplete;
    private final BiConsumer<ContextView, SpanBuilder> onSubscription;

    public static <T> TraceFlux<T> trace(Publisher<T> publisher) {
        return new TraceFlux<>(Flux.from(publisher), null, null, null, null, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TraceFlux(Flux<? extends T> flux, String str, Tracer tracer, BiConsumer<Span, T> biConsumer, BiConsumer<Span, Long> biConsumer2, BiConsumer<ContextView, SpanBuilder> biConsumer3) {
        super(flux);
        this.spanName = str == null ? name() : str;
        this.tracer = tracer == null ? TraceHolder.telemetry().getTracer(TraceHolder.appName()) : tracer;
        this.onNext = biConsumer;
        this.onSubscription = biConsumer3;
        this.onComplete = biConsumer2;
    }

    public TraceFlux<T> onNext(BiConsumer<Span, T> biConsumer) {
        if (this.onNext != null) {
            biConsumer = this.onNext.andThen(biConsumer);
        }
        return new TraceFlux<>(this.source, this.spanName, this.tracer, biConsumer, this.onComplete, this.onSubscription);
    }

    public TraceFlux<T> onComplete(BiConsumer<Span, Long> biConsumer) {
        if (this.onComplete != null) {
            biConsumer = this.onComplete.andThen(biConsumer);
        }
        return new TraceFlux<>(this.source, this.spanName, this.tracer, this.onNext, biConsumer, this.onSubscription);
    }

    public TraceFlux<T> spanName(String str) {
        return new TraceFlux<>(this.source, str, this.tracer, this.onNext, this.onComplete, this.onSubscription);
    }

    public TraceFlux<T> scopeName(String str) {
        return new TraceFlux<>(this.source, this.spanName, TraceHolder.telemetry().getTracer(str), this.onNext, this.onComplete, this.onSubscription);
    }

    public TraceFlux<T> onSubscription(BiConsumer<ContextView, SpanBuilder> biConsumer) {
        if (this.onSubscription != null) {
            biConsumer = this.onSubscription.andThen(biConsumer);
        }
        return new TraceFlux<>(this.source, this.spanName, this.tracer, this.onNext, this.onComplete, biConsumer);
    }

    public void subscribe(@Nonnull CoreSubscriber<? super T> coreSubscriber) {
        try {
            SpanBuilder spanBuilder = this.tracer.spanBuilder(this.spanName);
            ContextView currentContext = coreSubscriber.currentContext();
            Context context = (Context) currentContext.getOrEmpty(Context.class).orElseGet(Context::root);
            if (null != this.onSubscription) {
                this.onSubscription.accept(currentContext, spanBuilder);
            }
            this.source.subscribe(new TraceSubscriber(coreSubscriber, spanBuilder.setStartTimestamp(System.currentTimeMillis(), TimeUnit.MILLISECONDS).setParent(context).startSpan(), this.onNext, this.onComplete, context));
        } catch (Throwable th) {
            coreSubscriber.onError(th);
        }
    }
}
