/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.File;
import java.lang.invoke.MethodHandle;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.annotation.HintFlag;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.dataview.MapView;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class FunctionITCase
extends StreamingTestBase {
    private static final String TEST_FUNCTION = TestUDF.class.getName();
    private static final Random random = new Random();
    private String udfClassName;
    private String jarPath;

    @Override
    @Before
    public void before() throws Exception {
        super.before();
        this.udfClassName = "LowerUDF" + random.nextInt(50);
        this.jarPath = UserClassLoaderJarTestUtils.createJarFile((File)TEMPORARY_FOLDER.newFolder(String.format("test-jar-%s", UUID.randomUUID())), (String)"test-classloader-udf.jar", (String)this.udfClassName, (String)String.format("public class %s extends org.apache.flink.table.functions.ScalarFunction {\n  public String eval(String str) {\n    return str.toLowerCase();\n  }\n}\n", this.udfClassName)).toURI().toString();
    }

    @Test
    public void testCreateCatalogFunctionInDefaultCatalog() {
        String ddl1 = "create function f1 as 'org.apache.flink.function.TestFunction'";
        this.tEnv().executeSql(ddl1);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f1"});
        this.tEnv().executeSql("DROP FUNCTION IF EXISTS default_catalog.default_database.f1");
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).doesNotContain((Object[])new String[]{"f1"});
    }

    @Test
    public void testCreateFunctionWithFullPath() {
        String ddl1 = "create function default_catalog.default_database.f2 as 'org.apache.flink.function.TestFunction'";
        this.tEnv().executeSql(ddl1);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f2"});
        this.tEnv().executeSql("DROP FUNCTION IF EXISTS default_catalog.default_database.f2");
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).doesNotContain((Object[])new String[]{"f2"});
    }

    @Test
    public void testCreateFunctionWithoutCatalogIdentifier() {
        String ddl1 = "create function default_database.f3 as 'org.apache.flink.function.TestFunction'";
        this.tEnv().executeSql(ddl1);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f3"});
        this.tEnv().executeSql("DROP FUNCTION IF EXISTS default_catalog.default_database.f3");
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).doesNotContain((Object[])new String[]{"f3"});
    }

    @Test
    public void testCreateFunctionCatalogNotExists() {
        String ddl1 = "create function catalog1.database1.f3 as 'org.apache.flink.function.TestFunction'";
        try {
            this.tEnv().executeSql(ddl1);
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e).hasMessage("Catalog catalog1 does not exist");
        }
    }

    @Test
    public void testCreateFunctionDBNotExists() {
        String ddl1 = "create function default_catalog.database1.f3 as 'org.apache.flink.function.TestFunction'";
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(ddl1)).hasMessage("Could not execute CREATE CATALOG FUNCTION: (catalogFunction: [Optional[This is a user-defined function]], identifier: [`default_catalog`.`database1`.`f3`], ignoreIfExists: [false], isTemporary: [false])");
    }

    @Test
    public void testCreateTemporaryCatalogFunction() {
        String ddl1 = "create temporary function default_catalog.default_database.f4 as '" + TEST_FUNCTION + "'";
        String ddl2 = "create temporary function if not exists default_catalog.default_database.f4 as '" + TEST_FUNCTION + "'";
        String ddl3 = "drop temporary function default_catalog.default_database.f4";
        String ddl4 = "drop temporary function if exists default_catalog.default_database.f4";
        this.tEnv().executeSql(ddl1);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f4"});
        this.tEnv().executeSql(ddl2);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f4"});
        this.tEnv().executeSql(ddl3);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).doesNotContain((Object[])new String[]{"f4"});
        this.tEnv().executeSql(ddl1);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(ddl1)).isInstanceOf(ValidationException.class)).hasMessage("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.");
        this.tEnv().executeSql(ddl3);
        this.tEnv().executeSql(ddl4);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(ddl3)).isInstanceOf(ValidationException.class)).hasMessage("Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist");
    }

    @Test
    public void testCreateTemporarySystemFunction() {
        String ddl1 = "create temporary system function f5 as '" + TEST_FUNCTION + "'";
        String ddl2 = "create temporary system function if not exists f5 as '" + TEST_FUNCTION + "'";
        String ddl3 = "drop temporary system function f5";
        this.tEnv().executeSql(ddl1);
        this.tEnv().executeSql(ddl2);
        this.tEnv().executeSql(ddl3);
    }

    @Test
    public void testCreateTemporarySystemFunctionByUsingJar() {
        String ddl = String.format("CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING JAR '%s'", this.udfClassName, this.jarPath);
        this.tEnv().executeSql(ddl);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f10"});
        this.tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10");
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).doesNotContain((Object[])new String[]{"f10"});
    }

    @Test
    public void testCreateCatalogFunctionByUsingJar() {
        String ddl = String.format("CREATE FUNCTION default_database.f11 AS '%s' USING JAR '%s'", this.udfClassName, this.jarPath);
        this.tEnv().executeSql(ddl);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f11"});
        this.tEnv().executeSql("DROP FUNCTION default_database.f11");
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).doesNotContain((Object[])new String[]{"f11"});
    }

    @Test
    public void testCreateTemporaryCatalogFunctionByUsingJar() {
        String ddl = String.format("CREATE TEMPORARY FUNCTION default_database.f12 AS '%s' USING JAR '%s'", this.udfClassName, this.jarPath);
        this.tEnv().executeSql(ddl);
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).contains((Object[])new String[]{"f12"});
        this.tEnv().executeSql("DROP TEMPORARY FUNCTION default_database.f12");
        Assertions.assertThat(Arrays.asList(this.tEnv().listFunctions())).doesNotContain((Object[])new String[]{"f12"});
    }

    @Test
    public void testAlterFunction() throws Exception {
        String create = "create function f3 as 'org.apache.flink.function.TestFunction'";
        String alter = "alter function f3 as 'org.apache.flink.function.TestFunction2'";
        ObjectPath objectPath = new ObjectPath("default_database", "f3");
        Assertions.assertThat((Optional)this.tEnv().getCatalog("default_catalog")).isPresent();
        Catalog catalog = (Catalog)this.tEnv().getCatalog("default_catalog").get();
        this.tEnv().executeSql(create);
        CatalogFunction beforeUpdate = catalog.getFunction(objectPath);
        Assertions.assertThat((String)beforeUpdate.getClassName()).isEqualTo("org.apache.flink.function.TestFunction");
        this.tEnv().executeSql(alter);
        CatalogFunction afterUpdate = catalog.getFunction(objectPath);
        Assertions.assertThat((String)afterUpdate.getClassName()).isEqualTo("org.apache.flink.function.TestFunction2");
    }

    @Test
    public void testAlterFunctionNonExists() {
        String alterUndefinedFunction = "ALTER FUNCTION default_catalog.default_database.f4 as 'org.apache.flink.function.TestFunction'";
        String alterFunctionInWrongCatalog = "ALTER FUNCTION catalog1.default_database.f4 as 'org.apache.flink.function.TestFunction'";
        String alterFunctionInWrongDB = "ALTER FUNCTION default_catalog.db1.f4 as 'org.apache.flink.function.TestFunction'";
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(alterUndefinedFunction)).hasMessage("Function default_database.f4 does not exist in Catalog default_catalog.");
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(alterFunctionInWrongCatalog)).hasMessage("Catalog catalog1 does not exist");
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(alterFunctionInWrongDB)).hasMessage("Function db1.f4 does not exist in Catalog default_catalog.");
    }

    @Test
    public void testAlterTemporaryCatalogFunction() {
        String alterTemporary = "ALTER TEMPORARY FUNCTION default_catalog.default_database.f4 as 'org.apache.flink.function.TestFunction'";
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(alterTemporary)).hasMessage("Alter temporary catalog function is not supported");
    }

    @Test
    public void testAlterTemporarySystemFunction() {
        String alterTemporary = "ALTER TEMPORARY SYSTEM FUNCTION default_catalog.default_database.f4 as 'org.apache.flink.function.TestFunction'";
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(alterTemporary)).hasMessage("Alter temporary system function is not supported");
    }

    @Test
    public void testDropFunctionNonExists() {
        String dropUndefinedFunction = "DROP FUNCTION default_catalog.default_database.f4";
        String dropFunctionInWrongCatalog = "DROP FUNCTION catalog1.default_database.f4";
        String dropFunctionInWrongDB = "DROP FUNCTION default_catalog.db1.f4";
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(dropUndefinedFunction)).hasMessage("Function default_database.f4 does not exist in Catalog default_catalog.");
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(dropFunctionInWrongCatalog)).hasMessage("Catalog catalog1 does not exist");
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(dropFunctionInWrongDB)).hasMessage("Function db1.f4 does not exist in Catalog default_catalog.");
    }

    @Test
    public void testDropTemporaryFunctionNonExits() {
        String dropUndefinedFunction = "DROP TEMPORARY FUNCTION default_catalog.default_database.f4";
        String dropFunctionInWrongCatalog = "DROP TEMPORARY FUNCTION catalog1.default_database.f4";
        String dropFunctionInWrongDB = "DROP TEMPORARY FUNCTION default_catalog.db1.f4";
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(dropUndefinedFunction)).hasMessage("Temporary catalog function `default_catalog`.`default_database`.`f4` doesn't exist");
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(dropFunctionInWrongCatalog)).hasMessage("Temporary catalog function `catalog1`.`default_database`.`f4` doesn't exist");
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(dropFunctionInWrongDB)).hasMessage("Temporary catalog function `default_catalog`.`db1`.`f4` doesn't exist");
    }

    @Test
    public void testCreateDropTemporaryCatalogFunctionsWithDifferentIdentifier() {
        String createNoCatalogDB = "create temporary function f4 as '" + TEST_FUNCTION + "'";
        String dropNoCatalogDB = "drop temporary function f4";
        this.tEnv().executeSql(createNoCatalogDB);
        this.tEnv().executeSql(dropNoCatalogDB);
        String createNonExistsCatalog = "create temporary function catalog1.default_database.f4 as '" + TEST_FUNCTION + "'";
        String dropNonExistsCatalog = "drop temporary function catalog1.default_database.f4";
        this.tEnv().executeSql(createNonExistsCatalog);
        this.tEnv().executeSql(dropNonExistsCatalog);
        String createNonExistsDB = "create temporary function default_catalog.db1.f4 as '" + TEST_FUNCTION + "'";
        String dropNonExistsDB = "drop temporary function default_catalog.db1.f4";
        this.tEnv().executeSql(createNonExistsDB);
        this.tEnv().executeSql(dropNonExistsDB);
    }

    @Test
    public void testDropTemporarySystemFunction() {
        String ddl1 = "create temporary system function f5 as '" + TEST_FUNCTION + "'";
        String ddl2 = "drop temporary system function f5";
        String ddl3 = "drop temporary system function if exists f5";
        this.tEnv().executeSql(ddl1);
        this.tEnv().executeSql(ddl2);
        this.tEnv().executeSql(ddl3);
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(ddl2)).hasMessage("Could not drop temporary system function. A function named 'f5' doesn't exist.");
    }

    @Test
    public void testUserDefinedRegularCatalogFunction() throws Exception {
        String functionDDL = "create function addOne as '" + TEST_FUNCTION + "'";
        String dropFunctionDDL = "drop function addOne";
        this.testUserDefinedCatalogFunction(functionDDL);
        this.tEnv().executeSql(dropFunctionDDL);
    }

    @Test
    public void testUserDefinedTemporaryCatalogFunction() throws Exception {
        String functionDDL = "create temporary function addOne as '" + TEST_FUNCTION + "'";
        String dropFunctionDDL = "drop temporary function addOne";
        this.testUserDefinedCatalogFunction(functionDDL);
        this.tEnv().executeSql(dropFunctionDDL);
    }

    @Test
    public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception {
        String functionDDL = String.format("create temporary system function lowerUdf as '%s' using jar '%s'", this.udfClassName, this.jarPath);
        String dropFunctionDDL = "drop temporary system function lowerUdf";
        this.testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
    }

    @Test
    public void testUserDefinedRegularCatalogFunctionByUsingJar() throws Exception {
        String functionDDL = String.format("create function lowerUdf as '%s' using jar '%s'", this.udfClassName, this.jarPath);
        String dropFunctionDDL = "drop function lowerUdf";
        this.testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
    }

    @Test
    public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception {
        String functionDDL = String.format("create temporary function lowerUdf as '%s' using jar '%s'", this.udfClassName, this.jarPath);
        String dropFunctionDDL = "drop temporary function lowerUdf";
        this.testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
    }

    @Test
    public void testUserDefinedTemporarySystemFunction() throws Exception {
        String functionDDL = "create temporary system function addOne as '" + TEST_FUNCTION + "'";
        String dropFunctionDDL = "drop temporary system function addOne";
        this.testUserDefinedCatalogFunction(functionDDL);
        this.tEnv().executeSql(dropFunctionDDL);
    }

    @Test
    public void testExpressionReducerByUsingJar() {
        String functionDDL = String.format("create temporary function lowerUdf as '%s' using jar '%s'", this.udfClassName, this.jarPath);
        this.tEnv().executeSql(functionDDL);
        TableResult tableResult = this.tEnv().executeSql("SELECT lowerUdf('HELLO')");
        List actualRows = CollectionUtil.iteratorToList((Iterator)tableResult.collect());
        Assertions.assertThat((List)actualRows).isEqualTo(Arrays.asList(Row.of((Object[])new Object[]{"hello"})));
        this.tEnv().executeSql("drop temporary function lowerUdf");
    }

    private void testUserDefinedCatalogFunction(String createFunctionDDL) throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1, "1000", 2}), Row.of((Object[])new Object[]{2, "1", 3}), Row.of((Object[])new Object[]{3, "2000", 4}), Row.of((Object[])new Object[]{1, "2", 2}), Row.of((Object[])new Object[]{2, "3000", 3}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        String sourceDDL = "create table t1(a int, b varchar, c int) with ('connector' = 'COLLECTION')";
        String sinkDDL = "create table t2(a int, b varchar, c int) with ('connector' = 'COLLECTION')";
        String query = "select t1.a, t1.b, addOne(t1.a, 1) as c from t1";
        this.tEnv().executeSql(sourceDDL);
        this.tEnv().executeSql(sinkDDL);
        this.tEnv().executeSql(createFunctionDDL);
        Table t2 = this.tEnv().sqlQuery(query);
        t2.executeInsert("t2").await();
        LinkedList<Row> result = TestCollectionTableFactory.RESULT();
        Assertions.assertThat(result).isEqualTo(sourceData);
        this.tEnv().executeSql("drop table t1");
        this.tEnv().executeSql("drop table t2");
    }

    private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL) throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1, "JARK"}), Row.of((Object[])new Object[]{2, "RON"}), Row.of((Object[])new Object[]{3, "LeoNard"}), Row.of((Object[])new Object[]{1, "FLINK"}), Row.of((Object[])new Object[]{2, "CDC"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
        String sinkDDL = "create table t2(a int, b varchar) with ('connector' = 'COLLECTION')";
        String query = "select a, lowerUdf(b) from t1";
        this.tEnv().executeSql(sourceDDL);
        this.tEnv().executeSql(sinkDDL);
        this.tEnv().executeSql(createFunctionDDL);
        Table t2 = this.tEnv().sqlQuery(query);
        t2.executeInsert("t2").await();
        LinkedList<Row> result = TestCollectionTableFactory.RESULT();
        List<Row> expected = Arrays.asList(Row.of((Object[])new Object[]{1, "jark"}), Row.of((Object[])new Object[]{2, "ron"}), Row.of((Object[])new Object[]{3, "leonard"}), Row.of((Object[])new Object[]{1, "flink"}), Row.of((Object[])new Object[]{2, "cdc"}));
        Assertions.assertThat(result).isEqualTo(expected);
        this.tEnv().executeSql("drop table t1");
        this.tEnv().executeSql("drop table t2");
        this.tEnv().executeSql(dropFunctionDDL);
    }

    @Test
    public void testPrimitiveScalarFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1, 1L, "-"}), Row.of((Object[])new Object[]{2, 2L, "--"}), Row.of((Object[])new Object[]{3, 3L, "---"}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{1, 3L, "-"}), Row.of((Object[])new Object[]{2, 6L, "--"}), Row.of((Object[])new Object[]{3, 9L, "---"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE TestTable(i INT NOT NULL, b BIGINT NOT NULL, s STRING) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
        this.tEnv().executeSql("INSERT INTO TestTable SELECT i, PrimitiveScalarFunction(i, b, s), s FROM TestTable").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testNullScalarFunction() throws Exception {
        List<Row> sinkData = Collections.singletonList(Row.of((Object[])new Object[]{"Boolean", "String", "<<unknown>>", "String", "Object", "Boolean"}));
        TestCollectionTableFactory.reset();
        this.tEnv().executeSql("CREATE TABLE TestTable(s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("ClassNameScalarFunction", ClassNameScalarFunction.class);
        this.tEnv().createTemporarySystemFunction("ClassNameOrUnknownScalarFunction", ClassNameOrUnknownScalarFunction.class);
        this.tEnv().createTemporarySystemFunction("WildcardClassNameScalarFunction", WildcardClassNameScalarFunction.class);
        this.tEnv().executeSql("INSERT INTO TestTable SELECT ClassNameScalarFunction(NULL), ClassNameScalarFunction(CAST(NULL AS STRING)), ClassNameOrUnknownScalarFunction(NULL), ClassNameOrUnknownScalarFunction(CAST(NULL AS STRING)), WildcardClassNameScalarFunction(NULL), WildcardClassNameScalarFunction(CAST(NULL AS BOOLEAN))").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testRowScalarFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1, Row.of((Object[])new Object[]{1, "1"})}), Row.of((Object[])new Object[]{2, Row.of((Object[])new Object[]{2, "2"})}), Row.of((Object[])new Object[]{3, Row.of((Object[])new Object[]{3, "3"})}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE TestTable(i INT, r ROW<i INT, s STRING>) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("RowScalarFunction", RowScalarFunction.class);
        this.tEnv().executeSql("INSERT INTO TestTable SELECT i, RowScalarFunction(r) FROM TestTable").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sourceData);
    }

    @Test
    public void testComplexScalarFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1, new byte[]{1, 2, 3}}), Row.of((Object[])new Object[]{2, new byte[]{2, 3, 4}}), Row.of((Object[])new Object[]{3, new byte[]{3, 4, 5}}), Row.of((Object[])new Object[]{null, null}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{1, "1+2012-12-12 12:12:12.123456789", "[1, 2, 3]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), ByteBuffer.wrap(new byte[]{1, 2, 3})}), Row.of((Object[])new Object[]{2, "2+2012-12-12 12:12:12.123456789", "[2, 3, 4]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), ByteBuffer.wrap(new byte[]{2, 3, 4})}), Row.of((Object[])new Object[]{3, "3+2012-12-12 12:12:12.123456789", "[3, 4, 5]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), ByteBuffer.wrap(new byte[]{3, 4, 5})}), Row.of((Object[])new Object[]{null, "null+2012-12-12 12:12:12.123456789", "null+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), null}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        RawType rawType = new RawType(Object.class, (TypeSerializer)new KryoSerializer(Object.class, new ExecutionConfig()));
        this.tEnv().executeSql("CREATE TABLE SourceTable(i INT, b BYTES) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(  i INT,   s1 STRING,   s2 STRING,   d DECIMAL(5, 2),  r " + rawType.asSerializableString() + ") WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("ComplexScalarFunction", ComplexScalarFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT   i,   ComplexScalarFunction(i, TIMESTAMP '2012-12-12 12:12:12.123456789'),   ComplexScalarFunction(b, TIMESTAMP '2012-12-12 12:12:12.123456789'),  ComplexScalarFunction(),   ComplexScalarFunction(b) FROM SourceTable").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testCustomScalarFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3}), Row.of((Object[])new Object[]{null}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{1, 1, 5}), Row.of((Object[])new Object[]{2, 2, 5}), Row.of((Object[])new Object[]{3, 3, 5}), Row.of((Object[])new Object[]{null, null, 5}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(i1 INT, i2 INT, i3 INT) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT   i,   CustomScalarFunction(i),   CustomScalarFunction(CAST(NULL AS INT), 5, i, i) FROM SourceTable").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testVarArgScalarFunction() {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{"Bob", 1}), Row.of((Object[])new Object[]{"Alice", 2}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(  s STRING,   i INT)WITH (  'connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("VarArgScalarFunction", VarArgScalarFunction.class);
        TableResult result = this.tEnv().executeSql("SELECT   VarArgScalarFunction(),   VarArgScalarFunction(i),   VarArgScalarFunction(i, i),   VarArgScalarFunction(s),   VarArgScalarFunction(s, i) FROM SourceTable");
        List actual = CollectionUtil.iteratorToList((Iterator)result.collect());
        List<Row> expected = Arrays.asList(Row.of((Object[])new Object[]{"(INT...)", "(INT...)", "(INT...)", "(STRING, INT...)", "(STRING, INT...)"}), Row.of((Object[])new Object[]{"(INT...)", "(INT...)", "(INT...)", "(STRING, INT...)", "(STRING, INT...)"}));
        Assertions.assertThat((List)actual).isEqualTo(expected);
    }

    @Test
    public void testRawLiteralScalarFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1, DayOfWeek.MONDAY}), Row.of((Object[])new Object[]{2, DayOfWeek.FRIDAY}), Row.of((Object[])new Object[]{null, null}));
        Object[] sinkData = new Row[]{Row.of((Object[])new Object[]{1, "MONDAY", DayOfWeek.MONDAY}), Row.of((Object[])new Object[]{1, "MONDAY", DayOfWeek.MONDAY}), Row.of((Object[])new Object[]{2, "FRIDAY", DayOfWeek.FRIDAY}), Row.of((Object[])new Object[]{2, "FRIDAY", DayOfWeek.FRIDAY}), Row.of((Object[])new Object[]{null, null, null}), Row.of((Object[])new Object[]{null, null, null})};
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        RawType rawType = new RawType(DayOfWeek.class, (TypeSerializer)new KryoSerializer(DayOfWeek.class, new ExecutionConfig()));
        this.tEnv().executeSql("CREATE TABLE SourceTable(  i INT,   r " + rawType.asSerializableString() + ") WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(  i INT,   s STRING,   r " + rawType.asSerializableString() + ") WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("RawLiteralScalarFunction", RawLiteralScalarFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable   (SELECT     i,     RawLiteralScalarFunction(r, TRUE),     RawLiteralScalarFunction(r, FALSE)    FROM SourceTable)UNION ALL   (SELECT     i,     RawLiteralScalarFunction(r, TRUE),     RawLiteralScalarFunction(r, FALSE)   FROM SourceTable)").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
    }

    @Test
    public void testStructuredScalarFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{"Bob", 42}), Row.of((Object[])new Object[]{"Alice", 12}), Row.of((Object[])new Object[]{null, 0}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{"Bob 42", "Tyler"}), Row.of((Object[])new Object[]{"Alice 12", "Tyler"}), Row.of((Object[])new Object[]{"<<null>>", "Tyler"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(s STRING, i INT NOT NULL) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(s1 STRING, s2 STRING) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("StructuredScalarFunction", StructuredScalarFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT   StructuredScalarFunction(StructuredScalarFunction(s, i)),   StructuredScalarFunction('Tyler', 27).name FROM SourceTable").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testInvalidCustomScalarFunction() {
        this.tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("INSERT INTO SinkTable SELECT CustomScalarFunction('test')").await()).hasMessage("Could not find an implementation method 'eval' in class '" + CustomScalarFunction.class.getName() + "' for function 'CustomScalarFunction' that matches the following signature:\njava.lang.String eval(java.lang.String)");
    }

    @Test
    public void testRowTableFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{"1,2,3"}), Row.of((Object[])new Object[]{"2,3,4"}), Row.of((Object[])new Object[]{"3,4,5"}), Row.of((Object[])new Object[]{null}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{"1,2,3", new String[]{"1", "2", "3"}}), Row.of((Object[])new Object[]{"2,3,4", new String[]{"2", "3", "4"}}), Row.of((Object[])new Object[]{"3,4,5", new String[]{"3", "4", "5"}}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(s STRING, sa ARRAY<STRING> NOT NULL) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT t.s, t.sa FROM SourceTable source, LATERAL TABLE(RowTableFunction(source.s)) t").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testStructuredTableFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{"Bob", 42}), Row.of((Object[])new Object[]{"Alice", 12}), Row.of((Object[])new Object[]{null, 0}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{"Bob", 42}), Row.of((Object[])new Object[]{"Alice", 12}), Row.of((Object[])new Object[]{null, 0}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(s STRING, i INT NOT NULL) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(s STRING, i INT NOT NULL) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("StructuredTableFunction", StructuredTableFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT t.name, t.age FROM SourceTable, LATERAL TABLE(StructuredTableFunction(s, i)) t").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testDynamicCatalogTableFunction() throws Exception {
        Object[] sinkData = new Row[]{Row.of((Object[])new Object[]{"Test is a string"}), Row.of((Object[])new Object[]{"42"}), Row.of((Object[])new Object[]{null})};
        TestCollectionTableFactory.reset();
        this.tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createFunction("DynamicTableFunction", DynamicTableFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT T1.s FROM TABLE(DynamicTableFunction('Test')) AS T1(s) UNION ALL SELECT CAST(T2.i AS STRING) FROM TABLE(DynamicTableFunction(42)) AS T2(i)UNION ALL SELECT CAST(T3.i AS STRING) FROM TABLE(DynamicTableFunction(CAST(NULL AS INT))) AS T3(i)").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
    }

    @Test
    public void testInvalidUseOfScalarFunction() {
        this.tEnv().executeSql("CREATE TABLE SinkTable(s BIGINT NOT NULL) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("INSERT INTO SinkTable SELECT * FROM TABLE(PrimitiveScalarFunction(1, 2, '3'))")).hasMessageContaining("SQL validation failed. Function 'PrimitiveScalarFunction' cannot be used as a table function.");
    }

    @Test
    public void testInvalidUseOfSystemScalarFunction() {
        this.tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
        Assertions.assertThatThrownBy(() -> this.tEnv().explainSql("INSERT INTO SinkTable SELECT * FROM TABLE(MD5('3'))", new ExplainDetail[0])).hasMessageContaining("Argument must be a table function: MD5");
    }

    @Test
    public void testInvalidUseOfTableFunction() {
        TestCollectionTableFactory.reset();
        this.tEnv().executeSql("CREATE TABLE SinkTable(s ROW<s STRING, sa ARRAY<STRING> NOT NULL>) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
        Assertions.assertThatThrownBy(() -> this.tEnv().explainSql("INSERT INTO SinkTable SELECT RowTableFunction('test')", new ExplainDetail[0])).hasMessageContaining("Cannot call table function here: 'RowTableFunction'");
    }

    @Test
    public void testAggregateFunction() throws Exception {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), "Bob"}), Row.of((Object[])new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), "Alice"}), Row.of((Object[])new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), null}), Row.of((Object[])new Object[]{LocalDateTime.parse("2007-12-03T10:15:30"), "Jonathan"}), Row.of((Object[])new Object[]{LocalDateTime.parse("2007-12-03T10:15:32"), "Bob"}), Row.of((Object[])new Object[]{LocalDateTime.parse("2007-12-03T10:15:32"), "Alice"}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{"Jonathan", "Alice=(Alice, 5), Bob=(Bob, 3), Jonathan=(Jonathan, 8)"}), Row.of((Object[])new Object[]{"Alice", "Alice=(Alice, 5), Bob=(Bob, 3)"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(ts TIMESTAMP(3), s STRING, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(s1 STRING, s2 STRING) WITH ('connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("LongestStringAggregateFunction", LongestStringAggregateFunction.class);
        this.tEnv().createTemporarySystemFunction("RawMapViewAggregateFunction", RawMapViewAggregateFunction.class);
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT LongestStringAggregateFunction(s), RawMapViewAggregateFunction(s) FROM SourceTable GROUP BY TUMBLE(ts, INTERVAL '1' SECOND)").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testLookupTableFunction() throws ExecutionException, InterruptedException {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{"Bob"}), Row.of((Object[])new Object[]{"Alice"}));
        List<Row> sinkData = Arrays.asList(Row.of((Object[])new Object[]{"Bob", new byte[0]}), Row.of((Object[])new Object[]{"Bob", new byte[]{66, 111, 98}}), Row.of((Object[])new Object[]{"Alice", new byte[0]}), Row.of((Object[])new Object[]{"Alice", new byte[]{65, 108, 105, 99, 101}}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable1(  s STRING,   proctime AS PROCTIME())WITH (  'connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE TABLE SourceTable2(  s STRING,  b BYTES)WITH (  'connector' = 'values',  'lookup-function-class' = '" + LookupTableFunction.class.getName() + "')");
        this.tEnv().executeSql("CREATE TABLE SinkTable(s STRING, b BYTES) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("INSERT INTO SinkTable SELECT SourceTable1.s, SourceTable2.b FROM SourceTable1 JOIN SourceTable2 FOR SYSTEM_TIME AS OF SourceTable1.proctime  ON SourceTable1.s = SourceTable2.s").await();
        Assertions.assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
    }

    @Test
    public void testSpecializedFunction() {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{"Bob", 1, new BigDecimal("123.45")}), Row.of((Object[])new Object[]{"Alice", 2, new BigDecimal("123.456")}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(  s STRING,   i INT,  d DECIMAL(6, 3))WITH (  'connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("TypeOfScalarFunction", TypeOfScalarFunction.class);
        TableResult result = this.tEnv().executeSql("SELECT   TypeOfScalarFunction('LITERAL'),   TypeOfScalarFunction(s),   TypeOfScalarFunction(i),   TypeOfScalarFunction(d) FROM SourceTable");
        List actual = CollectionUtil.iteratorToList((Iterator)result.collect());
        List<Row> expected = Arrays.asList(Row.of((Object[])new Object[]{"CHAR(7) NOT NULL", "STRING", "INT", "DECIMAL(6, 3)"}), Row.of((Object[])new Object[]{"CHAR(7) NOT NULL", "STRING", "INT", "DECIMAL(6, 3)"}));
        Assertions.assertThat((List)actual).isEqualTo(expected);
    }

    @Test
    public void testSpecializedFunctionWithExpressionEvaluation() {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{"Bob", new Integer[]{1, 2, 3}, new BigDecimal("123.000")}), Row.of((Object[])new Object[]{"Bob", new Integer[]{4, 5, 6}, new BigDecimal("123.456")}), Row.of((Object[])new Object[]{"Alice", new Integer[]{1, 2, 3}, null}), Row.of((Object[])new Object[]{"Alice", null, new BigDecimal("123.456")}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(  s STRING,   a ARRAY<INT>,  d DECIMAL(6, 3))WITH (  'connector' = 'COLLECTION')");
        this.tEnv().createTemporarySystemFunction("RowEqualityScalarFunction", RowEqualityScalarFunction.class);
        TableResult result = this.tEnv().executeSql("SELECT   s,   RowEqualityScalarFunction((a, d), (a, 123.456)),   RowEqualityScalarFunction((a, 123.456), (a, d))FROM SourceTable");
        List actual = CollectionUtil.iteratorToList((Iterator)result.collect());
        List<Row> expected = Arrays.asList(Row.of((Object[])new Object[]{"Bob", null, null}), Row.of((Object[])new Object[]{"Bob", Row.of((Object[])new Object[]{new Long[]{4L, 5L, 6L}, 123.456}), Row.of((Object[])new Object[]{new Long[]{4L, 5L, 6L}, 123.456})}), Row.of((Object[])new Object[]{"Alice", null, null}), Row.of((Object[])new Object[]{"Alice", Row.of((Object[])new Object[]{null, 123.456}), Row.of((Object[])new Object[]{null, 123.456})}));
        Assertions.assertThat((List)actual).isEqualTo(expected);
    }

    @Test
    public void testTimestampNotNull() {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(i INT, ts AS CAST(LOCALTIMESTAMP AS TIMESTAMP(3)), WATERMARK FOR ts AS ts) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE FUNCTION MyYear AS '" + MyYear.class.getName() + "'");
        CollectionUtil.iteratorToList((Iterator)this.tEnv().executeSql("SELECT MyYear(ts) FROM SourceTable").collect());
    }

    @Test
    public void testIsNullType() {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{null}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE FUNCTION BoolToInt AS '" + BoolToInt.class.getName() + "'");
        CollectionUtil.iteratorToList((Iterator)this.tEnv().executeSql("SELECT BoolToInt(i is null), BoolToInt(i is not null) FROM SourceTable").collect());
    }

    @Test
    public void testWithBoolNotNullTypeHint() {
        List<Row> sourceData = Arrays.asList(Row.of((Object[])new Object[]{1, 2}), Row.of((Object[])new Object[]{2, 3}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        this.tEnv().executeSql("CREATE TABLE SourceTable(x INT NOT NULL,y INT) WITH ('connector' = 'COLLECTION')");
        this.tEnv().executeSql("CREATE FUNCTION BoolEcho AS '" + BoolEcho.class.getName() + "'");
        CollectionUtil.iteratorToList((Iterator)this.tEnv().executeSql("SELECT BoolEcho(x=1 and y is null) FROM SourceTable").collect());
    }

    @Test
    public void testUsingAddJar() throws Exception {
        this.tEnv().executeSql(String.format("ADD JAR '%s'", this.jarPath));
        TableResult tableResult = this.tEnv().executeSql("SHOW JARS");
        Assertions.assertThat((boolean)CollectionUtil.iteratorToList((Iterator)tableResult.collect()).equals(Collections.singletonList(Row.of((Object[])new Object[]{new Path(this.jarPath).getPath()})))).isTrue();
        this.testUserDefinedFunctionByUsingJar(String.format("create function lowerUdf as '%s' LANGUAGE JAVA", this.udfClassName), "drop function lowerUdf");
    }

    public static class BoolEcho
    extends ScalarFunction {
        public Boolean eval(@DataTypeHint(value="BOOLEAN NOT NULL") Boolean b) {
            return b;
        }
    }

    public static class RowEqualityScalarFunction
    extends ScalarFunction
    implements SpecializedFunction {
        private static final DataType IN_ROW_TYPE = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"nested0", (DataType)DataTypes.ARRAY((DataType)DataTypes.INT())), DataTypes.FIELD((String)"nested1", (DataType)DataTypes.DECIMAL((int)6, (int)3))});
        private static final DataType OUT_ROW_TYPE = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"result0", (DataType)DataTypes.ARRAY((DataType)DataTypes.BIGINT())), DataTypes.FIELD((String)"result1", (DataType)DataTypes.DOUBLE())});
        private final SpecializedFunction.ExpressionEvaluator rowEqualizer;
        private final SpecializedFunction.ExpressionEvaluator rowCaster;
        private transient MethodHandle rowEqualizerHandle;
        private transient MethodHandle rowCasterHandle;

        public RowEqualityScalarFunction(SpecializedFunction.ExpressionEvaluator rowEqualizer, SpecializedFunction.ExpressionEvaluator rowCaster) {
            this.rowEqualizer = rowEqualizer;
            this.rowCaster = rowCaster;
        }

        public RowEqualityScalarFunction() {
            this(null, null);
        }

        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
            return TypeInference.newBuilder().typedArguments(new DataType[]{IN_ROW_TYPE, IN_ROW_TYPE}).outputTypeStrategy(call -> Optional.of(OUT_ROW_TYPE)).build();
        }

        public RowEqualityScalarFunction specialize(SpecializedFunction.SpecializedContext context) {
            SpecializedFunction.ExpressionEvaluator rowEqualizer = context.createEvaluator((Expression)((ApiExpression)Expressions.$((String)"a").isEqual((Object)Expressions.$((String)"b"))).ifNull((Object)Expressions.$((String)"on_null")), (DataType)((DataType)DataTypes.BOOLEAN().notNull()).bridgedTo(Boolean.TYPE), new DataTypes.Field[]{DataTypes.FIELD((String)"a", (DataType)IN_ROW_TYPE), DataTypes.FIELD((String)"b", (DataType)IN_ROW_TYPE), DataTypes.FIELD((String)"on_null", (DataType)((DataType)((DataType)DataTypes.BOOLEAN().notNull()).bridgedTo(Boolean.TYPE)))});
            SpecializedFunction.ExpressionEvaluator rowCaster = context.createEvaluator(BuiltInFunctionDefinitions.CAST, OUT_ROW_TYPE, new DataType[]{IN_ROW_TYPE});
            return new RowEqualityScalarFunction(rowEqualizer, rowCaster);
        }

        public void open(FunctionContext context) throws Exception {
            Preconditions.checkNotNull((Object)this.rowEqualizer);
            Preconditions.checkNotNull((Object)this.rowCaster);
            this.rowEqualizerHandle = this.rowEqualizer.open(context);
            this.rowCasterHandle = this.rowCaster.open(context);
        }

        public Row eval(Row a, Row b) {
            try {
                boolean isEqual = this.rowEqualizerHandle.invokeExact(a, b, true);
                if (isEqual) {
                    return this.rowCasterHandle.invokeExact(a);
                }
                return null;
            }
            catch (Throwable t) {
                throw new FlinkRuntimeException(t);
            }
        }
    }

    public static class TypeOfScalarFunction
    extends ScalarFunction
    implements SpecializedFunction {
        private final String typeString;

        public TypeOfScalarFunction() {
            this("UNKNOWN");
        }

        public TypeOfScalarFunction(String typeString) {
            this.typeString = typeString;
        }

        public String eval(@DataTypeHint(inputGroup=InputGroup.ANY) Object unused) {
            return this.typeString;
        }

        public TypeOfScalarFunction specialize(SpecializedFunction.SpecializedContext context) {
            List dataTypes = context.getCallContext().getArgumentDataTypes();
            return new TypeOfScalarFunction(((DataType)dataTypes.get(0)).toString());
        }
    }

    @DataTypeHint(value="ROW<s STRING, b BYTES>")
    public static class LookupTableFunction
    extends TableFunction<Object> {
        public void eval(@DataTypeHint(value="STRING") StringData s) {
            this.collect(Row.of((Object[])new Object[]{s.toString(), new byte[0]}));
            this.collect(Row.of((Object[])new Object[]{s.toString(), s.toBytes()}));
        }
    }

    public static class RawMapViewAggregateFunction
    extends AggregateFunction<String, AccWithRawView> {
        public AccWithRawView createAccumulator() {
            return new AccWithRawView();
        }

        public void accumulate(AccWithRawView acc, String value) throws Exception {
            if (value != null) {
                acc.view.put((Object)value, (Object)new RawPojo(value));
            }
        }

        public String getValue(AccWithRawView acc) {
            return acc.view.getMap().entrySet().stream().map(Objects::toString).sorted().collect(Collectors.joining(", "));
        }

        public static class AccWithRawView {
            @DataTypeHint(allowRawGlobally=HintFlag.TRUE)
            public MapView<String, RawPojo> view = new MapView();
        }

        public static class RawPojo {
            public String a;
            public int b;

            public RawPojo(String s) {
                this.a = s;
                this.b = s.length();
            }

            public String toString() {
                return "(" + this.a + ", " + this.b + ')';
            }
        }
    }

    @FunctionHint(accumulator=@DataTypeHint(value="ROW<longestString STRING>"))
    public static class LongestStringAggregateFunction
    extends AggregateFunction<String, Row> {
        public Row createAccumulator() {
            return Row.of((Object[])new Object[]{null});
        }

        public void accumulate(Row acc, String value) {
            if (value == null) {
                return;
            }
            String longestString = (String)acc.getField(0);
            if (longestString == null || longestString.length() < value.length()) {
                acc.setField(0, (Object)value);
            }
        }

        public String getValue(Row acc) {
            return (String)acc.getField(0);
        }
    }

    public static class StructuredUser {
        public final String name;
        public final int age;

        public StructuredUser(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name + " " + this.age;
        }
    }

    public static class StructuredTableFunction
    extends TableFunction<StructuredUser> {
        public void eval(String name, int age) {
            if (name == null) {
                this.collect(null);
            }
            this.collect(new StructuredUser(name, age));
        }
    }

    public static class StructuredScalarFunction
    extends ScalarFunction {
        public StructuredUser eval(String name, int age) {
            if (name == null) {
                return null;
            }
            return new StructuredUser(name, age);
        }

        public String eval(StructuredUser user) {
            if (user == null) {
                return "<<null>>";
            }
            return user.toString();
        }
    }

    public static class WildcardClassNameScalarFunction
    extends ClassNameScalarFunction {
        public String eval(Object o) {
            return "Object";
        }

        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
            return TypeInference.newBuilder().outputTypeStrategy(TypeStrategies.explicit((DataType)DataTypes.STRING())).build();
        }
    }

    public static class ClassNameOrUnknownScalarFunction
    extends ClassNameScalarFunction {
        public String eval(@DataTypeHint(value="NULL") Object o) {
            return "<<unknown>>";
        }
    }

    public static class ClassNameScalarFunction
    extends ScalarFunction {
        public String eval(Integer i) {
            return "Integer";
        }

        public String eval(Boolean b) {
            return "Boolean";
        }

        public String eval(String s) {
            return "String";
        }
    }

    public static class DynamicTableFunction
    extends TableFunction<Object> {
        @FunctionHint(output=@DataTypeHint(value="STRING"))
        public void eval(String s) {
            if (s == null) {
                Assertions.fail((String)"unknown failure");
            } else {
                this.collect(s + " is a string");
            }
        }

        @FunctionHint(output=@DataTypeHint(value="INT"))
        public void eval(Integer i) {
            if (i == null) {
                this.collect(null);
            } else {
                this.collect(i);
            }
        }
    }

    @FunctionHint(output=@DataTypeHint(value="ROW<s STRING, sa ARRAY<STRING> NOT NULL>"))
    public static class RowTableFunction
    extends TableFunction<Row> {
        public void eval(String s) {
            if (s == null) {
                this.collect(null);
            } else {
                this.collect(Row.of((Object[])new Object[]{s, s.split(",")}));
            }
        }
    }

    public static class VarArgScalarFunction
    extends ScalarFunction {
        public String eval(Integer ... i) {
            return "(INT...)";
        }

        public String eval(String s, Integer ... i2) {
            return "(STRING, INT...)";
        }
    }

    public static class CustomScalarFunction
    extends ScalarFunction {
        public Integer eval(Integer ... args) {
            for (Integer o : args) {
                if (o == null) continue;
                return o;
            }
            return null;
        }

        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
            return TypeInference.newBuilder().outputTypeStrategy(TypeStrategies.argument((int)0)).build();
        }
    }

    public static class RawLiteralScalarFunction
    extends ScalarFunction {
        public Object eval(DayOfWeek dayOfWeek, Boolean asString) {
            if (dayOfWeek == null) {
                return null;
            }
            if (asString.booleanValue()) {
                return dayOfWeek.toString();
            }
            return dayOfWeek;
        }

        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
            DataType dayOfWeekDataType = DataTypes.RAW(DayOfWeek.class).toDataType(typeFactory);
            return TypeInference.newBuilder().typedArguments(new DataType[]{dayOfWeekDataType, (DataType)DataTypes.BOOLEAN().notNull()}).outputTypeStrategy(callContext -> {
                boolean asString = callContext.getArgumentValue(1, Boolean.class).orElse(false);
                if (asString) {
                    return Optional.of(DataTypes.STRING());
                }
                return Optional.of(dayOfWeekDataType);
            }).build();
        }
    }

    public static class ComplexScalarFunction
    extends ScalarFunction {
        public String eval(@DataTypeHint(inputGroup=InputGroup.ANY) Object o, Timestamp t) {
            return StringUtils.arrayAwareToString((Object)o) + "+" + t.toString();
        }

        @DataTypeHint(value="DECIMAL(5, 2)")
        public BigDecimal eval() {
            return new BigDecimal("123.4");
        }

        @DataTypeHint(value="RAW")
        public ByteBuffer eval(byte[] bytes) {
            if (bytes == null) {
                return null;
            }
            return ByteBuffer.wrap(bytes);
        }
    }

    public static class RowScalarFunction
    extends ScalarFunction {
        @DataTypeHint(value="ROW<f0 INT, f1 STRING>")
        public Row eval(@DataTypeHint(value="ROW<f0 INT, f1 STRING>") Row row) {
            return row;
        }
    }

    public static class PrimitiveScalarFunction
    extends ScalarFunction {
        public long eval(int i, long l, String s) {
            return (long)i + l + (long)s.length();
        }
    }

    public static class MyYear
    extends ScalarFunction {
        public int eval(@DataTypeHint(value="TIMESTAMP(3) NOT NULL") LocalDateTime timestamp) {
            return timestamp.getYear();
        }
    }

    public static class BoolToInt
    extends ScalarFunction {
        public int eval(boolean b) {
            return b ? 1 : 0;
        }
    }

    public static class TestUDF
    extends ScalarFunction {
        public Integer eval(Integer a, Integer b) {
            return a + b;
        }
    }
}

