/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.batch.sql;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.TestManagedTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CompactManagedTableITCase
extends BatchTestBase {
    private final ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)this.tEnv().getCurrentCatalog(), (String)this.tEnv().getCurrentDatabase(), (String)"MyTable");
    private final Map<CatalogPartitionSpec, List<RowData>> collectedElements = new HashMap<CatalogPartitionSpec, List<RowData>>();
    private Path rootPath;
    private AtomicReference<Map<CatalogPartitionSpec, List<Path>>> referenceOfManagedTableFileEntries;

    @Override
    @Before
    public void before() throws Exception {
        super.before();
        TestManagedTableFactory.MANAGED_TABLES.put(this.tableIdentifier, new AtomicReference());
        this.referenceOfManagedTableFileEntries = new AtomicReference();
        TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES.put(this.tableIdentifier, this.referenceOfManagedTableFileEntries);
        try {
            this.rootPath = new Path(new Path(TEMPORARY_FOLDER.newFolder().getPath()), this.tableIdentifier.asSummaryString());
            this.rootPath.getFileSystem().mkdirs(this.rootPath);
        }
        catch (IOException e) {
            Assertions.fail((String)String.format("Failed to create dir for %s", this.rootPath), (Throwable)e);
        }
    }

    @Override
    @After
    public void after() {
        super.after();
        this.tEnv().executeSql("DROP TABLE MyTable");
        this.collectedElements.clear();
        try {
            this.rootPath.getFileSystem().delete(this.rootPath, true);
        }
        catch (IOException e) {
            Assertions.fail((String)String.format("Failed to delete dir for %s", this.rootPath), (Throwable)e);
        }
    }

    @Test
    public void testCompactPartitionOnNonPartitionedTable() {
        String sql = "CREATE TABLE MyTable (id BIGINT, content STRING)";
        this.tEnv().executeSql(sql);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("ALTER TABLE MyTable PARTITION (season = 'summer') COMPACT")).isInstanceOf(ValidationException.class)).hasMessageContaining(String.format("Table %s is not partitioned.", this.tableIdentifier));
    }

    @Test
    public void testCompactPartitionOnNonExistedPartitionKey() {
        String sql = "CREATE TABLE MyTable (\n  id BIGINT,\n  content STRING,\n  season STRING\n) PARTITIONED BY (season)";
        this.tEnv().executeSql(sql);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("ALTER TABLE MyTable PARTITION (saeson = 'summer') COMPACT")).isInstanceOf(ValidationException.class)).hasMessageContaining("Partition column 'saeson' not defined in the table schema. Available ordered partition columns: ['season']");
    }

    @Test
    public void testCompactPartitionOnNonExistedPartitionValue() throws Exception {
        String sql = "CREATE TABLE MyTable (\n  id BIGINT,\n  content STRING,\n  season STRING\n) PARTITIONED BY (season)";
        this.prepare(sql, Collections.singletonList(this.of("season", "'spring'")));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("ALTER TABLE MyTable PARTITION (season = 'summer') COMPACT")).isInstanceOf(ValidationException.class)).hasMessageContaining("Cannot resolve partition spec CatalogPartitionSpec{{season=summer}}");
    }

    @Test
    public void testCompactNonPartitionedTable() throws Exception {
        String sql = "CREATE TABLE MyTable (id BIGINT, content STRING)";
        this.prepare(sql, Collections.emptyList());
        CatalogPartitionSpec unresolvedDummySpec = new CatalogPartitionSpec(Collections.emptyMap());
        Set<CatalogPartitionSpec> resolvedPartitionSpecsHaveBeenOrToBeCompacted = Collections.singleton(unresolvedDummySpec);
        this.executeAndCheck(unresolvedDummySpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
    }

    @Test
    public void testCompactSinglePartitionedTable() throws Exception {
        String sql = "CREATE TABLE MyTable (\n  id BIGINT,\n  content STRING,\n  season STRING\n) PARTITIONED BY (season)";
        this.prepare(sql, Arrays.asList(this.of("season", "'spring'"), this.of("season", "'summer'")));
        HashSet<CatalogPartitionSpec> resolvedPartitionSpecsHaveBeenOrToBeCompacted = new HashSet<CatalogPartitionSpec>();
        CatalogPartitionSpec unresolvedPartitionSpec = new CatalogPartitionSpec(this.of("season", "'summer'"));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "summer")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        unresolvedPartitionSpec = new CatalogPartitionSpec(Collections.emptyMap());
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "spring")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
    }

    @Test
    public void testCompactMultiPartitionedTable() throws Exception {
        String sql = "CREATE TABLE MyTable (  id BIGINT,\n  content STRING,\n  season STRING,\n  `month` INT\n) PARTITIONED BY (season, `month`)";
        this.prepare(sql, Arrays.asList(this.of("season", "'spring'", "`month`", "2"), this.of("season", "'spring'", "`month`", "3"), this.of("season", "'spring'", "`month`", "4"), this.of("season", "'summer'", "`month`", "5"), this.of("season", "'summer'", "`month`", "6"), this.of("season", "'summer'", "`month`", "7"), this.of("season", "'summer'", "`month`", "8"), this.of("season", "'autumn'", "`month`", "8"), this.of("season", "'autumn'", "`month`", "9"), this.of("season", "'autumn'", "`month`", "10"), this.of("season", "'winter'", "`month`", "11"), this.of("season", "'winter'", "`month`", "12"), this.of("season", "'winter'", "`month`", "1")));
        HashSet<CatalogPartitionSpec> resolvedPartitionSpecsHaveBeenOrToBeCompacted = new HashSet<CatalogPartitionSpec>();
        CatalogPartitionSpec unresolvedPartitionSpec = new CatalogPartitionSpec(this.of("season", "'spring'", "`month`", "2"));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "spring", "month", "2")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        unresolvedPartitionSpec = new CatalogPartitionSpec(this.of("`month`", "3", "season", "'spring'"));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "spring", "month", "3")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        unresolvedPartitionSpec = new CatalogPartitionSpec(this.of("season", "'winter'"));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "winter", "month", "1")));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "winter", "month", "11")));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "winter", "month", "12")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        unresolvedPartitionSpec = new CatalogPartitionSpec(this.of("`month`", "5"));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "summer", "month", "5")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        unresolvedPartitionSpec = new CatalogPartitionSpec(this.of("`month`", "8"));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "summer", "month", "8")));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "autumn", "month", "8")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        unresolvedPartitionSpec = new CatalogPartitionSpec(Collections.emptyMap());
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "spring", "month", "4")));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "summer", "month", "6")));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "summer", "month", "7")));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "autumn", "month", "9")));
        resolvedPartitionSpecsHaveBeenOrToBeCompacted.add(new CatalogPartitionSpec(this.of("season", "autumn", "month", "10")));
        this.executeAndCheck(unresolvedPartitionSpec, resolvedPartitionSpecsHaveBeenOrToBeCompacted);
    }

    private void prepare(String managedTableDDL, List<LinkedHashMap<String, String>> partitionKVs) throws Exception {
        this.prepareMirrorTables(managedTableDDL);
        this.prepareFileEntries(partitionKVs);
        this.scanFileEntries();
    }

    private void prepareMirrorTables(String managedTableDDL) {
        this.tEnv().executeSql(managedTableDDL);
        String helperSource = "CREATE TABLE HelperSource (id BIGINT, content STRING ) WITH (  'connector' = 'datagen',   'rows-per-second' = '5',   'fields.id.kind' = 'sequence',   'fields.id.start' = '0',   'fields.id.end' = '200',   'fields.content.kind' = 'random',   'number-of-rows' = '50')";
        String helperSink = String.format("CREATE TABLE HelperSink WITH (  'connector' = 'filesystem',   'format' = 'testcsv',   'path' = '%s' )LIKE MyTable (EXCLUDING OPTIONS)", this.rootPath.getPath());
        this.tEnv().executeSql(helperSource);
        this.tEnv().executeSql(helperSink);
    }

    private void prepareFileEntries(List<LinkedHashMap<String, String>> partitionKVs) throws Exception {
        this.tEnv().executeSql(CompactManagedTableITCase.prepareInsertDML(partitionKVs)).await();
    }

    private static String prepareInsertDML(List<LinkedHashMap<String, String>> partitionKVs) {
        StringBuilder dmlBuilder = new StringBuilder("INSERT INTO HelperSink\n");
        if (partitionKVs.isEmpty()) {
            return dmlBuilder.append("SELECT id,\n  content\nFROM HelperSource\n").toString();
        }
        for (int i = 0; i < partitionKVs.size(); ++i) {
            dmlBuilder.append("SELECT id,\n  content,\n");
            int j = 0;
            for (Map.Entry<String, String> entry : partitionKVs.get(i).entrySet()) {
                dmlBuilder.append("  ");
                dmlBuilder.append(entry.getValue());
                dmlBuilder.append(" AS ");
                dmlBuilder.append(entry.getKey());
                if (j < partitionKVs.get(i).size() - 1) {
                    dmlBuilder.append(",\n");
                } else {
                    dmlBuilder.append("\n");
                }
                ++j;
            }
            dmlBuilder.append("FROM HelperSource\n");
            if (i >= partitionKVs.size() - 1) continue;
            dmlBuilder.append("UNION ALL\n");
        }
        return dmlBuilder.toString();
    }

    private void scanFileEntries() throws IOException {
        HashMap managedTableFileEntries = new HashMap();
        try (Stream<java.nio.file.Path> pathStream = Files.walk(Paths.get(this.rootPath.getPath(), new String[0]), new FileVisitOption[0]);){
            pathStream.filter(x$0 -> Files.isRegularFile(x$0, new LinkOption[0])).forEach(filePath -> {
                Path file = new Path(filePath.toString());
                CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec((Map)PartitionPathUtils.extractPartitionSpecFromPath((Path)file));
                List fileEntries = managedTableFileEntries.getOrDefault(partitionSpec, new ArrayList());
                fileEntries.add(file);
                managedTableFileEntries.put(partitionSpec, fileEntries);
                List elements = this.collectedElements.getOrDefault(partitionSpec, new ArrayList());
                elements.addAll(CompactManagedTableITCase.readElementsFromFile(filePath.toFile()));
                this.collectedElements.put(partitionSpec, elements);
            });
        }
        this.referenceOfManagedTableFileEntries.set(managedTableFileEntries);
    }

    private static List<RowData> readElementsFromFile(File file) {
        ArrayList<RowData> elements = new ArrayList<RowData>();
        try (BufferedReader reader = new BufferedReader(new FileReader(file));){
            String line;
            while ((line = reader.readLine()) != null) {
                elements.add((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)line)}));
            }
        }
        catch (IOException e) {
            Assertions.fail((String)"This should not happen");
        }
        return elements;
    }

    private LinkedHashMap<String, String> of(String ... kvs) {
        Assertions.assertThat((kvs != null && kvs.length % 2 == 0 ? 1 : 0) != 0).isTrue();
        LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
        for (int i = 0; i < kvs.length - 1; i += 2) {
            map.put(kvs[i], kvs[i + 1]);
        }
        return map;
    }

    private static String prepareCompactSql(CatalogPartitionSpec unresolvedCompactPartitionSpec) {
        String compactSqlTemplate = "ALTER TABLE MyTable%s COMPACT";
        Map partitionKVs = unresolvedCompactPartitionSpec.getPartitionSpec();
        StringBuilder sb = new StringBuilder();
        int index = 0;
        for (Map.Entry entry : partitionKVs.entrySet()) {
            if (index == 0) {
                sb.append(" PARTITION (");
            }
            sb.append((String)entry.getKey());
            sb.append(" = ");
            sb.append((String)entry.getValue());
            if (index < partitionKVs.size() - 1) {
                sb.append(", ");
            }
            if (index == partitionKVs.size() - 1) {
                sb.append(")");
            }
            ++index;
        }
        return String.format(compactSqlTemplate, sb);
    }

    private void executeAndCheck(CatalogPartitionSpec unresolvedPartitionSpec, Set<CatalogPartitionSpec> resolvedPartitionSpecsHaveBeenOrToBeCompacted) throws ExecutionException, InterruptedException {
        String compactSql = CompactManagedTableITCase.prepareCompactSql(unresolvedPartitionSpec);
        this.tEnv().executeSql(compactSql).await();
        Map<CatalogPartitionSpec, Long> firstRun = this.checkFileAndElements(resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        this.tEnv().executeSql(compactSql).await();
        Map<CatalogPartitionSpec, Long> secondRun = this.checkFileAndElements(resolvedPartitionSpecsHaveBeenOrToBeCompacted);
        this.checkModifiedTime(firstRun, secondRun);
    }

    private Map<CatalogPartitionSpec, Long> checkFileAndElements(Set<CatalogPartitionSpec> resolvedPartitionSpecsHaveBeenOrToBeCompacted) {
        HashMap<CatalogPartitionSpec, Long> lastModifiedForEachPartition = new HashMap<CatalogPartitionSpec, Long>();
        Map<CatalogPartitionSpec, List<Path>> managedTableFileEntries = this.referenceOfManagedTableFileEntries.get();
        managedTableFileEntries.forEach((partitionSpec, fileEntries) -> {
            if (resolvedPartitionSpecsHaveBeenOrToBeCompacted.contains(partitionSpec)) {
                Assertions.assertThat((List)fileEntries).hasSize(1);
                Path compactedFile = (Path)fileEntries.get(0);
                Assertions.assertThat((String)compactedFile.getName()).startsWith((CharSequence)"compact-");
                List<RowData> compactedElements = CompactManagedTableITCase.readElementsFromFile(new File(compactedFile.getPath()));
                Assertions.assertThat(compactedElements).hasSameElementsAs((Iterable)this.collectedElements.get(partitionSpec));
                lastModifiedForEachPartition.put((CatalogPartitionSpec)partitionSpec, CompactManagedTableITCase.getLastModifiedTime(compactedFile));
            } else {
                fileEntries.forEach(file -> {
                    Assertions.assertThat((String)file.getName()).startsWith((CharSequence)"part-");
                    List<RowData> elements = CompactManagedTableITCase.readElementsFromFile(new File(file.getPath()));
                    Assertions.assertThat(this.collectedElements.get(partitionSpec)).containsAll(elements);
                });
            }
        });
        return lastModifiedForEachPartition;
    }

    private void checkModifiedTime(Map<CatalogPartitionSpec, Long> firstRun, Map<CatalogPartitionSpec, Long> secondRun) {
        firstRun.forEach((partitionSpec, lastModified) -> ((AbstractLongAssert)Assertions.assertThat((Long)((Long)secondRun.get(partitionSpec))).isEqualTo(lastModified)).isNotEqualTo(-1L));
    }

    private static long getLastModifiedTime(Path compactedFile) {
        try {
            FileStatus status = compactedFile.getFileSystem().getFileStatus(compactedFile);
            return status.getModificationTime();
        }
        catch (IOException e) {
            Assertions.fail((String)"This should not happen");
            return -1L;
        }
    }
}

