/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.addons.hbase;

import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.addons.hbase.HBaseOptions;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.addons.hbase.HBaseUpsertTableSink;
import org.apache.flink.addons.hbase.HBaseWriteOptions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.HBaseValidator;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

public class HBaseTableFactory
implements StreamTableSourceFactory<Row>,
StreamTableSinkFactory<Tuple2<Boolean, Row>> {
    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = this.getValidatedProperties(properties);
        Configuration hbaseClientConf = HBaseConfiguration.create();
        String hbaseZk = descriptorProperties.getString("connector.zookeeper.quorum");
        hbaseClientConf.set("hbase.zookeeper.quorum", hbaseZk);
        descriptorProperties.getOptionalString("connector.zookeeper.znode.parent").ifPresent(v -> hbaseClientConf.set("zookeeper.znode.parent", v));
        String hTableName = descriptorProperties.getString("connector.table-name");
        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)descriptorProperties.getTableSchema("schema"));
        HBaseTableSchema hbaseSchema = this.validateTableSchema(tableSchema);
        return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
    }

    public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = this.getValidatedProperties(properties);
        HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
        hbaseOptionsBuilder.setZkQuorum(descriptorProperties.getString("connector.zookeeper.quorum"));
        hbaseOptionsBuilder.setTableName(descriptorProperties.getString("connector.table-name"));
        descriptorProperties.getOptionalString("connector.zookeeper.znode.parent").ifPresent(hbaseOptionsBuilder::setZkNodeParent);
        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)descriptorProperties.getTableSchema("schema"));
        HBaseTableSchema hbaseSchema = this.validateTableSchema(tableSchema);
        HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
        descriptorProperties.getOptionalInt("connector.write.buffer-flush.max-rows").ifPresent(writeBuilder::setBufferFlushMaxRows);
        descriptorProperties.getOptionalMemorySize("connector.write.buffer-flush.max-size").ifPresent(v -> writeBuilder.setBufferFlushMaxSizeInBytes(v.getBytes()));
        descriptorProperties.getOptionalDuration("connector.write.buffer-flush.interval").ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
        return new HBaseUpsertTableSink(hbaseSchema, hbaseOptionsBuilder.build(), writeBuilder.build());
    }

    private HBaseTableSchema validateTableSchema(TableSchema schema) {
        HBaseTableSchema hbaseSchema = new HBaseTableSchema();
        String[] fieldNames = schema.getFieldNames();
        TypeInformation[] fieldTypes = schema.getFieldTypes();
        for (int i = 0; i < fieldNames.length; ++i) {
            String name = fieldNames[i];
            TypeInformation type = fieldTypes[i];
            if (type instanceof RowTypeInfo) {
                RowTypeInfo familyType = (RowTypeInfo)type;
                String[] qualifierNames = familyType.getFieldNames();
                TypeInformation[] qualifierTypes = familyType.getFieldTypes();
                for (int j = 0; j < familyType.getArity(); ++j) {
                    Class clazz = qualifierTypes[j].getTypeClass();
                    if (LocalDateTime.class.equals((Object)clazz)) {
                        clazz = Timestamp.class;
                    } else if (LocalDate.class.equals(clazz)) {
                        clazz = Date.class;
                    } else if (LocalTime.class.equals(clazz)) {
                        clazz = Time.class;
                    }
                    hbaseSchema.addColumn(name, qualifierNames[j], clazz);
                }
                continue;
            }
            hbaseSchema.setRowKey(name, type.getTypeClass());
        }
        return hbaseSchema;
    }

    private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = new DescriptorProperties(true);
        descriptorProperties.putProperties(properties);
        new HBaseValidator().validate(descriptorProperties);
        return descriptorProperties;
    }

    public Map<String, String> requiredContext() {
        HashMap<String, String> context = new HashMap<String, String>();
        context.put("connector.type", "hbase");
        context.put("connector.version", this.hbaseVersion());
        context.put("connector.property-version", "1");
        return context;
    }

    public List<String> supportedProperties() {
        ArrayList<String> properties = new ArrayList<String>();
        properties.add("connector.table-name");
        properties.add("connector.zookeeper.quorum");
        properties.add("connector.zookeeper.znode.parent");
        properties.add("connector.write.buffer-flush.max-size");
        properties.add("connector.write.buffer-flush.max-rows");
        properties.add("connector.write.buffer-flush.interval");
        properties.add("schema.#.data-type");
        properties.add("schema.#.type");
        properties.add("schema.#.name");
        properties.add("schema.#.expr");
        properties.add("schema.watermark.#.rowtime");
        properties.add("schema.watermark.#.strategy.expr");
        properties.add("schema.watermark.#.strategy.data-type");
        return properties;
    }

    private String hbaseVersion() {
        return "1.4.3";
    }
}

