package org.apache.flink.connector.file.table;

import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/file/table/FileSystemTableSinkTest.class */
public class FileSystemTableSinkTest {
    @Test
    public void testExceptionWhenSettingParallelismWithUpdatingQuery() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql(buildSourceTableSql("test_source_table", false));
        create.executeSql(buildSinkTableSql("test_sink_table", 10, false));
        String format = String.format("INSERT INTO %s SELECT DISTINCT * FROM %s", "test_sink_table", "test_source_table");
        CommonTestUtils.assertThrows("filesystem sink doesn't support setting parallelism (10) by 'sink.parallelism' when the input stream is not INSERT only.", ValidationException.class, () -> {
            return create.explainSql(format, new ExplainDetail[0]);
        });
    }

    @Test
    public void testFileSystemTableSinkWithParallelismInStreaming() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
        create.executeSql(buildSourceTableSql("test_source_table", false));
        create.executeSql(buildSinkTableSql("test_sink_table", 5, false));
        Assert.assertEquals(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInStreamingSql0.out")))), TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(create.explainSql(buildInsertIntoSql("test_sink_table", "test_source_table"), new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN})))));
        create.executeSql(buildSinkTableSql("test_compact_sink_table", 5, true));
        Assert.assertEquals(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInStreamingSql1.out")))), TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(create.explainSql(buildInsertIntoSql("test_compact_sink_table", "test_source_table"), new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN})))));
    }

    @Test
    public void testFileSystemTableSinkWithParallelismInBatch() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inBatchMode());
        create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 8);
        create.executeSql(buildSourceTableSql("test_source_table", true));
        create.executeSql(buildSinkTableSql("test_sink_table", 5, false));
        Assert.assertEquals(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInBatch.out")))), TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(create.explainSql(buildInsertIntoSql("test_sink_table", "test_source_table"), new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN})))));
    }

    private static String buildSourceTableSql(String str, boolean z) {
        return String.format("CREATE TABLE %s ( id BIGINT, real_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 4)) WITH ( 'connector' = 'values', 'bounded' = '%s')", str, Boolean.valueOf(z));
    }

    private static String buildSinkTableSql(String str, int i, boolean z) {
        return String.format("CREATE TABLE %s ( id BIGINT, real_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 4)) WITH ( 'connector' = 'filesystem', 'path' = '/tmp', 'auto-compaction' = '%s', 'format' = 'testcsv', 'sink.parallelism' = '%s')", str, Boolean.valueOf(z), Integer.valueOf(i));
    }

    private static String buildInsertIntoSql(String str, String str2) {
        return String.format("INSERT INTO %s SELECT * FROM %s", str, str2);
    }
}
