/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.aliyun.log.producer.internals;

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.aliyun.log.producer.errors.TimeoutException;
import com.aliyun.openservices.aliyun.log.producer.internals.ExpiredBatches;
import com.aliyun.openservices.aliyun.log.producer.internals.GroupKey;
import com.aliyun.openservices.aliyun.log.producer.internals.IOThreadPool;
import com.aliyun.openservices.aliyun.log.producer.internals.LogSizeCalculator;
import com.aliyun.openservices.aliyun.log.producer.internals.ProducerBatch;
import com.aliyun.openservices.aliyun.log.producer.internals.RetryQueue;
import com.aliyun.openservices.aliyun.log.producer.internals.SendProducerBatchTask;
import com.aliyun.openservices.aliyun.log.producer.internals.Utils;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LogAccumulator {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogAccumulator.class);
    private static final AtomicLong BATCH_ID = new AtomicLong(0L);
    private final String producerHash;
    private final ProducerConfig producerConfig;
    private final Map<String, Client> clientPool;
    private final Semaphore memoryController;
    private final RetryQueue retryQueue;
    private final BlockingQueue<ProducerBatch> successQueue;
    private final BlockingQueue<ProducerBatch> failureQueue;
    private final IOThreadPool ioThreadPool;
    private final AtomicInteger batchCount;
    private final ConcurrentMap<GroupKey, ProducerBatchHolder> batches;
    private final AtomicInteger appendsInProgress;
    private volatile boolean closed;

    public LogAccumulator(String producerHash, ProducerConfig producerConfig, Map<String, Client> clientPool, Semaphore memoryController, RetryQueue retryQueue, BlockingQueue<ProducerBatch> successQueue, BlockingQueue<ProducerBatch> failureQueue, IOThreadPool ioThreadPool, AtomicInteger batchCount) {
        this.producerHash = producerHash;
        this.producerConfig = producerConfig;
        this.clientPool = clientPool;
        this.memoryController = memoryController;
        this.retryQueue = retryQueue;
        this.successQueue = successQueue;
        this.failureQueue = failureQueue;
        this.ioThreadPool = ioThreadPool;
        this.batchCount = batchCount;
        this.batches = new ConcurrentHashMap<GroupKey, ProducerBatchHolder>();
        this.appendsInProgress = new AtomicInteger(0);
        this.closed = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<Result> append(String project, String logStore, String topic, String source, String shardHash, List<LogItem> logItems, Callback callback) throws InterruptedException, ProducerException {
        this.appendsInProgress.incrementAndGet();
        try {
            ListenableFuture<Result> listenableFuture = this.doAppend(project, logStore, topic, source, shardHash, logItems, callback);
            return listenableFuture;
        }
        finally {
            this.appendsInProgress.decrementAndGet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ListenableFuture<Result> doAppend(String project, String logStore, String topic, String source, String shardHash, List<LogItem> logItems, Callback callback) throws InterruptedException, ProducerException {
        if (this.closed) {
            throw new IllegalStateException("cannot append after the log accumulator was closed");
        }
        int sizeInBytes = LogSizeCalculator.calculate(logItems);
        this.ensureValidLogSize(sizeInBytes);
        long maxBlockMs = this.producerConfig.getMaxBlockMs();
        LOGGER.trace("Prepare to acquire bytes, sizeInBytes={}, maxBlockMs={}, project={}, logStore={}", new Object[]{sizeInBytes, maxBlockMs, project, logStore});
        if (maxBlockMs >= 0L) {
            boolean acquired = this.memoryController.tryAcquire(sizeInBytes, maxBlockMs, TimeUnit.MILLISECONDS);
            if (!acquired) {
                throw new TimeoutException("failed to acquire memory within the configured max blocking time " + this.producerConfig.getMaxBlockMs() + " ms");
            }
        } else {
            this.memoryController.acquire(sizeInBytes);
        }
        try {
            ProducerBatchHolder holder;
            GroupKey groupKey = new GroupKey(project, logStore, topic, source, shardHash);
            ProducerBatchHolder producerBatchHolder = holder = this.getOrCreateProducerBatchHolder(groupKey);
            synchronized (producerBatchHolder) {
                return this.appendToHolder(groupKey, logItems, callback, sizeInBytes, holder);
            }
        }
        catch (Exception e) {
            this.memoryController.release(sizeInBytes);
            throw new ProducerException(e);
        }
    }

    private ListenableFuture<Result> appendToHolder(GroupKey groupKey, List<LogItem> logItems, Callback callback, int sizeInBytes, ProducerBatchHolder holder) {
        ListenableFuture<Result> f;
        if (holder.producerBatch != null) {
            f = holder.producerBatch.tryAppend(logItems, sizeInBytes, callback);
            if (f != null) {
                if (holder.producerBatch.isMeetSendCondition()) {
                    holder.transferProducerBatch(this.ioThreadPool, this.producerConfig, this.clientPool, this.retryQueue, this.successQueue, this.failureQueue, this.batchCount);
                }
                return f;
            }
            holder.transferProducerBatch(this.ioThreadPool, this.producerConfig, this.clientPool, this.retryQueue, this.successQueue, this.failureQueue, this.batchCount);
        }
        holder.producerBatch = new ProducerBatch(groupKey, Utils.generatePackageId(this.producerHash, BATCH_ID), this.producerConfig.getBatchSizeThresholdInBytes(), this.producerConfig.getBatchCountThreshold(), this.producerConfig.getMaxReservedAttempts(), System.currentTimeMillis());
        f = holder.producerBatch.tryAppend(logItems, sizeInBytes, callback);
        this.batchCount.incrementAndGet();
        if (holder.producerBatch.isMeetSendCondition()) {
            holder.transferProducerBatch(this.ioThreadPool, this.producerConfig, this.clientPool, this.retryQueue, this.successQueue, this.failureQueue, this.batchCount);
        }
        return f;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExpiredBatches expiredBatches() {
        long nowMs = System.currentTimeMillis();
        ExpiredBatches expiredBatches = new ExpiredBatches();
        long remainingMs = this.producerConfig.getLingerMs();
        for (Map.Entry entry : this.batches.entrySet()) {
            ProducerBatchHolder holder;
            ProducerBatchHolder producerBatchHolder = holder = (ProducerBatchHolder)entry.getValue();
            synchronized (producerBatchHolder) {
                if (holder.producerBatch == null) {
                    continue;
                }
                long curRemainingMs = holder.producerBatch.remainingMs(nowMs, this.producerConfig.getLingerMs());
                if (curRemainingMs <= 0L) {
                    holder.transferProducerBatch(expiredBatches);
                } else {
                    remainingMs = Math.min(remainingMs, curRemainingMs);
                }
            }
        }
        expiredBatches.setRemainingMs(remainingMs);
        return expiredBatches;
    }

    public List<ProducerBatch> remainingBatches() {
        if (!this.closed) {
            throw new IllegalStateException("cannot get the remaining batches before the log accumulator closed");
        }
        ArrayList<ProducerBatch> remainingBatches = new ArrayList<ProducerBatch>();
        while (this.appendsInProgress()) {
            this.drainTo(remainingBatches);
        }
        this.drainTo(remainingBatches);
        this.batches.clear();
        return remainingBatches;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int drainTo(List<ProducerBatch> c) {
        int n = 0;
        for (Map.Entry entry : this.batches.entrySet()) {
            ProducerBatchHolder holder;
            ProducerBatchHolder producerBatchHolder = holder = (ProducerBatchHolder)entry.getValue();
            synchronized (producerBatchHolder) {
                if (holder.producerBatch == null) {
                    continue;
                }
                c.add(holder.producerBatch);
                ++n;
                holder.producerBatch = null;
            }
        }
        return n;
    }

    private void ensureValidLogSize(int sizeInBytes) throws LogSizeTooLargeException {
        if (sizeInBytes > 0xA00000) {
            throw new LogSizeTooLargeException("the logs is " + sizeInBytes + " bytes which is larger than MAX_BATCH_SIZE_IN_BYTES " + 0xA00000);
        }
        if (sizeInBytes > this.producerConfig.getTotalSizeInBytes()) {
            throw new LogSizeTooLargeException("the logs is " + sizeInBytes + " bytes which is larger than the totalSizeInBytes you specified");
        }
    }

    private ProducerBatchHolder getOrCreateProducerBatchHolder(GroupKey groupKey) {
        ProducerBatchHolder holder = (ProducerBatchHolder)this.batches.get(groupKey);
        if (holder != null) {
            return holder;
        }
        holder = new ProducerBatchHolder();
        ProducerBatchHolder previous = this.batches.putIfAbsent(groupKey, holder);
        if (previous == null) {
            return holder;
        }
        return previous;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void close() {
        this.closed = true;
    }

    private boolean appendsInProgress() {
        return this.appendsInProgress.get() > 0;
    }

    private static final class ProducerBatchHolder {
        ProducerBatch producerBatch;

        private ProducerBatchHolder() {
        }

        void transferProducerBatch(IOThreadPool ioThreadPool, ProducerConfig producerConfig, Map<String, Client> clientPool, RetryQueue retryQueue, BlockingQueue<ProducerBatch> successQueue, BlockingQueue<ProducerBatch> failureQueue, AtomicInteger batchCount) {
            if (this.producerBatch == null) {
                return;
            }
            ioThreadPool.submit(new SendProducerBatchTask(this.producerBatch, producerConfig, clientPool, retryQueue, successQueue, failureQueue, batchCount));
            this.producerBatch = null;
        }

        void transferProducerBatch(ExpiredBatches expiredBatches) {
            if (this.producerBatch == null) {
                return;
            }
            expiredBatches.add(this.producerBatch);
            this.producerBatch = null;
        }
    }
}

