/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
import org.junit.Test;

public class DeduplicationJsonPlanITCase
extends JsonPlanTestBase {
    @Test
    public void testDeduplication() throws Exception {
        List<Row> data = Arrays.asList(Row.of((Object[])new Object[]{1L, "terry", "pen", 1000L}), Row.of((Object[])new Object[]{2L, "alice", "pen", 2000L}), Row.of((Object[])new Object[]{3L, "bob", "pen", 3000L}), Row.of((Object[])new Object[]{4L, "bob", "apple", 4000L}), Row.of((Object[])new Object[]{5L, "fish", "apple", 5000L}));
        this.createTestValuesSourceTable("MyTable", data, "order_id bigint", "`user` varchar", "product varchar", "order_time bigint ", "event_time as TO_TIMESTAMP(FROM_UNIXTIME(order_time)) ", "watermark for event_time as event_time - INTERVAL '5' second ");
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "order_id bigint", "`user` varchar", "product varchar", "order_time bigint", "primary key(product) not enforced");
        CompiledPlan compiledPlan = this.tableEnv.compilePlanSql("insert into MySink select order_id, user, product, order_time \nFROM (\n  SELECT *,\n    ROW_NUMBER() OVER (PARTITION BY product ORDER BY event_time ASC) AS row_num\n  FROM MyTable)\nWHERE row_num = 1 \n");
        this.tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)1);
        this.checkTransformationUids(compiledPlan);
        compiledPlan.execute().await();
        this.assertResult(Arrays.asList("+I[1, terry, pen, 1000]", "+I[4, bob, apple, 4000]"), TestValuesTableFactory.getRawResults("MySink"));
    }
}

