/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexer.hadoop;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.JobHelper;
import org.apache.druid.indexer.hadoop.DatasourceIngestionSpec;
import org.apache.druid.indexer.hadoop.DatasourceInputFormat;
import org.apache.druid.indexer.hadoop.DatasourceInputSplit;
import org.apache.druid.indexer.hadoop.SegmentInputRow;
import org.apache.druid.indexer.hadoop.WindowedDataSegment;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;

public class DatasourceRecordReader
extends RecordReader<NullWritable, InputRow> {
    private static final Logger logger = new Logger(DatasourceRecordReader.class);
    private DatasourceIngestionSpec spec;
    private IngestSegmentFirehose firehose;
    private long rowNum;
    private Row currRow;
    private List<QueryableIndex> indexes = new ArrayList<QueryableIndex>();
    private List<File> tmpSegmentDirs = new ArrayList<File>();
    private long numRows;

    public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException {
        List<WindowedDataSegment> segments = ((DatasourceInputSplit)split).getSegments();
        String dataSource = (String)Iterators.getOnlyElement(segments.stream().map(s -> s.getSegment().getDataSource()).distinct().iterator());
        this.spec = DatasourceInputFormat.getIngestionSpec(context.getConfiguration(), dataSource);
        logger.info("load schema [%s]", new Object[]{this.spec});
        List adapters = Lists.transform(segments, (Function)new Function<WindowedDataSegment, WindowedStorageAdapter>(){

            public WindowedStorageAdapter apply(WindowedDataSegment segment) {
                try {
                    logger.info("Getting storage path for segment [%s]", new Object[]{segment.getSegment().getId()});
                    Path path = new Path(JobHelper.getURIFromSegment(segment.getSegment()));
                    logger.info("Fetch segment files from [%s]", new Object[]{path});
                    File dir = FileUtils.createTempDir();
                    DatasourceRecordReader.this.tmpSegmentDirs.add(dir);
                    logger.info("Locally storing fetched segment at [%s]", new Object[]{dir});
                    JobHelper.unzipNoGuava(path, context.getConfiguration(), dir, (Progressable)context, null);
                    logger.info("finished fetching segment files", new Object[0]);
                    QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir);
                    DatasourceRecordReader.this.indexes.add(index);
                    DatasourceRecordReader.this.numRows = DatasourceRecordReader.this.numRows + (long)index.getNumRows();
                    return new WindowedStorageAdapter((StorageAdapter)new QueryableIndexStorageAdapter(index), segment.getInterval());
                }
                catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
            }
        });
        this.firehose = new IngestSegmentFirehose(adapters, this.spec.getTransformSpec(), this.spec.getDimensions(), this.spec.getMetrics(), this.spec.getFilter());
    }

    public boolean nextKeyValue() {
        if (this.firehose.hasMore()) {
            this.currRow = this.firehose.nextRow();
            ++this.rowNum;
            return true;
        }
        return false;
    }

    public NullWritable getCurrentKey() {
        return NullWritable.get();
    }

    public InputRow getCurrentValue() {
        return this.currRow == null ? null : new SegmentInputRow(this.currRow, this.spec.getDimensions());
    }

    public float getProgress() {
        if (this.numRows > 0L) {
            return (float)this.rowNum * 1.0f / (float)this.numRows;
        }
        return 0.0f;
    }

    public void close() throws IOException {
        Closeables.close((Closeable)this.firehose, (boolean)true);
        for (QueryableIndex qi : this.indexes) {
            Closeables.close((Closeable)qi, (boolean)true);
        }
        for (File dir : this.tmpSegmentDirs) {
            FileUtils.deleteDirectory((File)dir);
        }
    }
}

