/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql.join;

import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001\u0005\rd\u0001B\u0001\u0003\u0001U\u0011\u0001\u0003V3na>\u0014\u0018\r\u001c&pS:$Vm\u001d;\u000b\u0005\r!\u0011\u0001\u00026pS:T!!\u0002\u0004\u0002\u0007M\fHN\u0003\u0002\b\u0011\u000511\u000f\u001e:fC6T!!\u0003\u0006\u0002\tAd\u0017M\u001c\u0006\u0003\u00171\tq\u0001\u001d7b]:,'O\u0003\u0002\u000e\u001d\u0005)A/\u00192mK*\u0011q\u0002E\u0001\u0006M2Lgn\u001b\u0006\u0003#I\ta!\u00199bG\",'\"A\n\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u00011\u0002CA\f\u001b\u001b\u0005A\"BA\r\u000b\u0003\u0015)H/\u001b7t\u0013\tY\u0002DA\u0007UC\ndW\rV3ti\n\u000b7/\u001a\u0005\u0006;\u0001!\tAH\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003}\u0001\"\u0001\t\u0001\u000e\u0003\tAqA\t\u0001C\u0002\u0013\u00051%\u0001\u0003vi&dW#\u0001\u0013\u0011\u0005])\u0013B\u0001\u0014\u0019\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0011\u0019A\u0003\u0001)A\u0005I\u0005)Q\u000f^5mA!)!\u0006\u0001C\u0001W\u00051!-\u001a4pe\u0016$\u0012\u0001\f\t\u0003[Aj\u0011A\f\u0006\u0002_\u0005)1oY1mC&\u0011\u0011G\f\u0002\u0005+:LG\u000f\u000b\u0002*gA\u0011AgN\u0007\u0002k)\u0011aGE\u0001\u0006UVt\u0017\u000e^\u0005\u0003qU\u0012aAQ3g_J,\u0007\"\u0002\u001e\u0001\t\u0003Y\u0013a\n;fgR,e/\u001a8u)&lW\rV3na>\u0014\u0018\r\u001c&pS:|e\u000eT3hC\u000eL8k\\;sG\u0016D#!\u000f\u001f\u0011\u0005Qj\u0014B\u0001 6\u0005\u0011!Vm\u001d;\t\u000b\u0001\u0003A\u0011A\u0016\u0002MQ,7\u000f\u001e)s_\u000e$\u0016.\\3UK6\u0004xN]1m\u0015>Lgn\u00148MK\u001e\f7-_*pkJ\u001cW\r\u000b\u0002@y!)1\t\u0001C\u0001W\u0005IB/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5oQ\t\u0011E\bC\u0003G\u0001\u0011\u00051&\u0001\u0018uKN$XI^3oiRKW.\u001a+f[B|'/\u00197K_&twJ\u001c+j[\u0016\u001cH/Y7q\u0019RT(k\\<uS6,\u0007FA#=\u0011\u0015I\u0005\u0001\"\u0001,\u0003\u0005\"Xm\u001d;Fm\u0016tG\u000fV5nKR+W\u000e]8sC2Tu.\u001b8XSRDg+[3xQ\tAE\bC\u0003M\u0001\u0011\u00051&\u0001\u001cuKN$XI^3oiRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i-&,woV5uQ\u000e{gn\u001d;b]R\u001cuN\u001c3ji&|g\u000e\u000b\u0002Ly!)q\n\u0001C\u0001W\u00051D/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o/&$\bNV5fo^KG\u000f\u001b$v]\u000e$\u0018n\u001c8D_:$\u0017\u000e^5p]\"\u0012a\n\u0010\u0005\u0006%\u0002!\taK\u0001)i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>LgnV5uQZKWm\u001e(p]\u0016\u000bX/\u001b\u0015\u0003#rBQ!\u0016\u0001\u0005\u0002-\nq\u0006^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]^KG\u000f\u001b,jK^<\u0016\u000e\u001e5Qe\u0016$\u0017nY1uKND#\u0001\u0016\u001f\t\u000ba\u0003A\u0011A\u0016\u0002gQ,7\u000f^#wK:$H+[7f\u0019\u00164G\u000fV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<x+\u001b;i!J,G-[2bi\u0016\u001c\bFA,=\u0011\u0015Y\u0006\u0001\"\u0001,\u0003\u001d\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"d\u0015m\u001d;S_^4\u0016.Z<)\u0005ic\u0004\"\u00020\u0001\t\u0003Y\u0013!\u000b;fgR\u0004&o\\2US6,G+Z7q_J\fGNS8j]^KG\u000f\u001b'bgR4\u0016\r\\;f-&,w\u000f\u000b\u0002^y!)\u0011\r\u0001C\u0001W\u00059C/Z:u!J|7\rV5nKR+W\u000e]8sC2Tu.\u001b8XSRDg+[3x\u001d>tW)];jQ\t\u0001G\bC\u0003e\u0001\u0011\u00051&\u0001\u0018uKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<x+\u001b;i!J,G-[2bi\u0016\u001c\bFA2=\u0011\u00159\u0007\u0001\"\u0001,\u0003U\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"\u001cu.\u001c9vi\u0016$7i\u001c7v[:\fe\u000e\u001a)vg\"$un\u001e8)\u0005\u0019d\u0004\"\u00026\u0001\t\u0003Y\u0013A\u000e;fgR,e/\u001a8u)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5D_6\u0004X\u000f^3e\u0007>dW/\u001c8B]\u0012\u0004Vo\u001d5E_^t\u0007FA5=\u0011\u0015i\u0007\u0001\"\u0001,\u0003!\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"\u0014\u0015N\u001c7pON{WO]2fQ\taG\bC\u0003q\u0001\u0011\u00051&A\u001buKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<x+\u001b;i\u0007>t7\u000f^1oi\u000e{g\u000eZ5uS>t\u0007FA8=\u0011\u0015\u0019\b\u0001\"\u0001,\u0003e\"Xm\u001d;Qe>\u001cG+[7f\u0019\u00164G\u000fV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<x+\u001b;i\u0007>t7\u000f^1oi\u000e{g\u000eZ5uS>t\u0007F\u0001:=\u0011\u00151\b\u0001\"\u0001,\u0003U\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"4\u0016.Z<XSRDg)\u001e8di&|gnQ8oI&$\u0018n\u001c8)\u0005Ud\u0004\"B=\u0001\t\u0003Y\u0013a\u0007;fgRLeN^1mS\u0012$V-\u001c9pe\u0006dG+\u00192m\u0015>Lg\u000e\u000b\u0002yy!)A\u0010\u0001C\u0001W\u0005)C/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o)>\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0003wrBaa \u0001\u0005\n\u0005\u0005\u0011!F3ya\u0016\u001cG/\u0012=dKB$\u0018n\u001c8UQJ|wO\u001c\u000b\bY\u0005\r\u00111DA\u0010\u0011\u0019)a\u00101\u0001\u0002\u0006A!\u0011qAA\u000b\u001d\u0011\tI!!\u0005\u0011\u0007\u0005-a&\u0004\u0002\u0002\u000e)\u0019\u0011q\u0002\u000b\u0002\rq\u0012xn\u001c;?\u0013\r\t\u0019BL\u0001\u0007!J,G-\u001a4\n\t\u0005]\u0011\u0011\u0004\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005Ma\u0006C\u0004\u0002\u001ey\u0004\r!!\u0002\u0002\u0011-,\u0017p^8sIND\u0011\"!\t\u007f!\u0003\u0005\r!a\t\u0002\u000b\rd\u0017M\u001f>1\t\u0005\u0015\u0012q\u0006\t\u0007\u0003\u000f\t9#a\u000b\n\t\u0005%\u0012\u0011\u0004\u0002\u0006\u00072\f7o\u001d\t\u0005\u0003[\ty\u0003\u0004\u0001\u0005\u0019\u0005E\u0012qDA\u0001\u0002\u0003\u0015\t!a\r\u0003\u0007}#\u0013'\u0005\u0003\u00026\u0005m\u0002cA\u0017\u00028%\u0019\u0011\u0011\b\u0018\u0003\u000f9{G\u000f[5oOB!\u0011QHA$\u001d\u0011\ty$a\u0011\u000f\t\u0005-\u0011\u0011I\u0005\u0002_%\u0019\u0011Q\t\u0018\u0002\u000fA\f7m[1hK&!\u0011\u0011JA&\u0005%!\u0006N]8xC\ndWMC\u0002\u0002F9Bq!a\u0014\u0001\t\u0013\t\t&\u0001\rwKJLg-\u001f+sC:\u001cH.\u0019;j_:\u001cVoY2fgN$2\u0001LA*\u0011\u001d)\u0011Q\na\u0001\u0003\u000bA\u0011\"a\u0016\u0001#\u0003%I!!\u0017\u0002?\u0015D\b/Z2u\u000bb\u001cW\r\u001d;j_:$\u0006N]8x]\u0012\"WMZ1vYR$3'\u0006\u0002\u0002\\A\"\u0011QLA1!\u0019\t9!a\n\u0002`A!\u0011QFA1\t1\t\t$!\u0016\u0002\u0002\u0003\u0005)\u0011AA\u001a\u0001")
public class TemporalJoinTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    public StreamTableTestUtil util() {
        return this.util;
    }

    @Before
    public void before() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE Orders (\n                    | amount INT,\n                    | currency STRING,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistory (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistoryWithPK (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesBinlogWithComputedColumn (\n                    | currency STRING,\n                    | rate INT,\n                    | rate1 AS rate + 1,\n                    | proctime AS PROCTIME(),\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UB,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesBinlogWithoutWatermark (\n                    | currency STRING,\n                    | rate INT,\n                    | rate1 AS rate + 1,\n                    | proctime AS PROCTIME(),\n                    | rowtime TIMESTAMP(3),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UB,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesOnly (\n                    | currency STRING,\n                    | rate INT,\n                    | proctime AS PROCTIME()\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistoryLegacy (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().addTable(" CREATE VIEW rates_last_row_rowtime AS SELECT currency, rate, rowtime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime DESC) AS rowNum    FROM RatesHistory  ) T   WHERE rowNum = 1");
        this.util().addTable(" CREATE VIEW rates_last_row_proctime AS SELECT T.currency, T.rate, T.proctime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY proctime DESC) AS rowNum    FROM RatesOnly  ) T   WHERE T.rowNum = 1");
        this.util().addTable("CREATE VIEW rates_last_value AS SELECT currency, LAST_VALUE(rate) AS rate FROM RatesHistory GROUP BY currency ");
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE OrdersLtz (\n                                | amount INT,\n                                | currency STRING,\n                                | ts BIGINT,\n                                | rowtime AS TO_TIMESTAMP_LTZ(ts, 3),\n                                | WATERMARK FOR rowtime AS rowtime\n                                |) WITH (\n                                | 'connector' = 'values'\n                                |)\n      ")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE RatesLtz (\n                                | currency STRING,\n                                | rate INT,\n                                | ts BIGINT,\n                                | rowtime as TO_TIMESTAMP_LTZ(ts, 3),\n                                | WATERMARK FOR rowtime AS rowtime,\n                                | PRIMARY KEY(currency) NOT ENFORCED\n                                |) WITH (\n                                | 'connector' = 'values'\n                                |)\n      ")).stripMargin());
    }

    @Test
    public void testEventTimeTemporalJoinOnLegacySource() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryLegacy FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinOnLegacySource() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryLegacy FOR SYSTEM_TIME AS OF o.proctime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoin() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinOnTimestampLtzRowtime() {
        String sqlQuery = "SELECT * FROM OrdersLtz AS o JOIN RatesLtz FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithFunctionCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND 'RMB-100' = concat('RMB-', cast(r.rate AS STRING))";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewNonEqui() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND o.amount > r.rate";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeLeftTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o LEFT JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithLastRowView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_proctime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithLastValueView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewNonEqui() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > r.rate";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithComputedColumnAndPushDown() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithComputedColumn FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithComputedColumnAndPushDown() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithComputedColumn FOR SYSTEM_TIME AS OF o.rowtime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithBinlogSource() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithoutWatermark FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeLeftTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o LEFT JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithFunctionCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND 'RMB-100' = concat('RMB-', cast(r.rate AS STRING))";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testInvalidTemporalTablJoin() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE leftTableWithoutTimeAttribute (\n                    | amount INT,\n                    | currency STRING,\n                    | ts TIMESTAMP(3)\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery1 = "SELECT * FROM leftTableWithoutTimeAttribute AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.ts AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery1, new StringBuilder(101).append("Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'").append(" left table's time attribute field").toString(), ValidationException.class);
        String sqlQuery2 = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.amount = r.rate";
        this.expectExceptionThrown(sqlQuery2, "Temporal table's primary key [currency0] must be included in the equivalence condition of temporal join, but current temporal join condition is [amount=rate].", ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutPk (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery3 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutPk FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery3, "Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:\nFlinkLogicalJoin(condition=[AND(=($1, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $6, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])", ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutTimeAttribute (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery4 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutTimeAttribute FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery4, new StringBuilder(139).append("Event-Time Temporal Table Join requires both primary key and row time attribute in ").append("versioned table, but no row time attribute can be found.").toString(), ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutRowtime (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery5 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutRowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery5, new StringBuilder(139).append("Event-Time Temporal Table Join requires both primary key and row time attribute in ").append("versioned table, but no row time attribute can be found.").toString(), ValidationException.class);
        String sqlQuery6 = "SELECT * FROM RatesHistory FOR SYSTEM_TIME AS OF TIMESTAMP '2020-11-11 13:12:13'";
        this.expectExceptionThrown(sqlQuery6, "Querying a temporal table using 'FOR SYSTEM TIME AS OF' syntax with a constant timestamp '2020-11-11 13:12:13' is not supported yet.", AssertionError.class);
        String sqlQuery7 = "SELECT * FROM RatesHistory FOR SYSTEM_TIME AS OF TO_TIMESTAMP(FROM_UNIXTIME(1))";
        this.expectExceptionThrown(sqlQuery7, "Querying a temporal table using 'FOR SYSTEM TIME AS OF' syntax with an expression call 'TO_TIMESTAMP(FROM_UNIXTIME(1))' is not supported yet.", AssertionError.class);
        String sqlQuery8 = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT *\n         | FROM OrdersLtz AS o JOIN\n         | RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r\n         | ON o.currency = r.currency\n          ")).stripMargin();
        this.expectExceptionThrown(sqlQuery8, "Event-Time Temporal Table Join requires same rowtime type in left table and versioned table, but the rowtime types are TIMESTAMP_LTZ(3) *ROWTIME* and TIMESTAMP(3) *ROWTIME*.", ValidationException.class);
    }

    @Test
    public void testEventTimeTemporalJoinToSinkWithPk() {
        TableEnvironment tEnv = this.util().tableEnv();
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE orders_rowtime (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  amount BIGINT,\n                       |  order_time TIMESTAMP(3),\n                       |  WATERMARK FOR order_time AS order_time,\n                       |  PRIMARY KEY (order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = 'rowTimeOrderDataId'\n                       |)\n                       |")).stripMargin());
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE versioned_currency_with_multi_key (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '10' SECOND,\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = 'rowTimeCurrencyDataId'\n                       |)\n                       |")).stripMargin());
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE rowtime_default_sink (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  amount BIGINT,\n                       |  l_time TIMESTAMP(3),\n                       |  rate BIGINT,\n                       |  r_time TIMESTAMP(3),\n                       |  PRIMARY KEY(order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'false',\n                       |  'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO rowtime_default_sink\n        |  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time\n        |  FROM orders_rowtime AS o JOIN versioned_currency_with_multi_key\n        |    FOR SYSTEM_TIME AS OF o.order_time as r\n        |      ON o.currency_no = r.currency_no AND o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    private void expectExceptionThrown(String sql, String keywords, Class<? extends Throwable> clazz) {
        try {
            this.verifyTranslationSuccess(sql);
            Assert.fail((String)new StringBuilder(40).append("Expected a ").append(clazz).append(", but no exception is thrown.").toString());
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Class<?> clazz2 = throwable2.getClass();
            Class<? extends Throwable> clazz3 = clazz;
            if (!(clazz2 != null ? !clazz2.equals(clazz3) : clazz3 != null)) {
                BoxedUnit boxedUnit;
                if (keywords != null) {
                    Assert.assertTrue((String)new StringBuilder(31).append("The actual exception message \n").append(throwable2.getMessage()).append("\n").append(new StringBuilder(35).append("doesn't contain expected keyword \n").append(keywords).append("\n").toString()).toString(), (boolean)throwable2.getMessage().contains(keywords));
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit2 = boxedUnit;
            }
            if (throwable2 != null) {
                Throwable throwable3 = throwable2;
                throwable3.printStackTrace();
                Assert.fail((String)new StringBuilder(25).append("Expected throw ").append(clazz.getSimpleName()).append(", but is ").append(throwable3).append(".").toString());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    private Class<? extends Throwable> expectExceptionThrown$default$3() {
        return ValidationException.class;
    }

    private void verifyTranslationSuccess(String sql) {
        this.util().tableEnv().sqlQuery(sql).explain(new ExplainDetail[0]);
    }
}

