/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.addons.hbase;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseUpsertSinkFunction
extends RichSinkFunction<Tuple2<Boolean, Row>>
implements CheckpointedFunction,
BufferedMutator.ExceptionListener {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseUpsertSinkFunction.class);
    private final String hTableName;
    private final HBaseTableSchema schema;
    private final byte[] serializedConfig;
    private final long bufferFlushMaxSizeInBytes;
    private final long bufferFlushMaxMutations;
    private final long bufferFlushIntervalMillis;
    private transient HBaseReadWriteHelper helper;
    private transient Connection connection;
    private transient BufferedMutator mutator;
    private transient ScheduledExecutorService executor;
    private transient ScheduledFuture scheduledFuture;
    private transient AtomicLong numPendingRequests;
    private volatile transient boolean closed = false;
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference();

    public HBaseUpsertSinkFunction(String hTableName, HBaseTableSchema schema, org.apache.hadoop.conf.Configuration conf, long bufferFlushMaxSizeInBytes, long bufferFlushMaxMutations, long bufferFlushIntervalMillis) {
        Preconditions.checkArgument((boolean)schema.getRowKeyName().isPresent(), (Object)"HBaseUpsertSinkFunction requires rowkey is set.");
        this.hTableName = hTableName;
        this.schema = schema;
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
        this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
        this.bufferFlushMaxMutations = bufferFlushMaxMutations;
        this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
    }

    public void open(Configuration parameters) throws Exception {
        LOG.info("start open ...");
        org.apache.hadoop.conf.Configuration config = this.prepareRuntimeConfiguration();
        try {
            this.helper = new HBaseReadWriteHelper(this.schema);
            this.numPendingRequests = new AtomicLong(0L);
            if (null == this.connection) {
                this.connection = ConnectionFactory.createConnection((org.apache.hadoop.conf.Configuration)config);
            }
            BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf((String)this.hTableName)).listener((BufferedMutator.ExceptionListener)this).writeBufferSize(this.bufferFlushMaxSizeInBytes);
            this.mutator = this.connection.getBufferedMutator(params);
            if (this.bufferFlushIntervalMillis > 0L) {
                this.executor = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                    if (this.closed) {
                        return;
                    }
                    try {
                        this.flush();
                    }
                    catch (Exception e) {
                        this.failureThrowable.compareAndSet(null, e);
                    }
                }, this.bufferFlushIntervalMillis, this.bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
            }
        }
        catch (TableNotFoundException tnfe) {
            LOG.error("The table " + this.hTableName + " not found ", (Throwable)tnfe);
            throw new RuntimeException("HBase table '" + this.hTableName + "' not found.", tnfe);
        }
        catch (IOException ioe) {
            LOG.error("Exception while creating connection to HBase.", (Throwable)ioe);
            throw new RuntimeException("Cannot create connection to HBase.", ioe);
        }
        LOG.info("end open.");
    }

    private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
        org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(this.serializedConfig, HBaseConfiguration.create());
        if (StringUtils.isNullOrWhitespaceOnly((String)runtimeConfig.get("hbase.zookeeper.quorum"))) {
            LOG.error("Can not connect to HBase without {} configuration", (Object)"hbase.zookeeper.quorum");
            throw new IOException("Check HBase configuration failed, lost: 'hbase.zookeeper.quorum'!");
        }
        return runtimeConfig;
    }

    private void checkErrorAndRethrow() {
        Throwable cause = this.failureThrowable.get();
        if (cause != null) {
            throw new RuntimeException("An error occurred in HBaseSink.", cause);
        }
    }

    public void invoke(Tuple2<Boolean, Row> value, SinkFunction.Context context) throws Exception {
        this.checkErrorAndRethrow();
        if (((Boolean)value.f0).booleanValue()) {
            Put put = this.helper.createPutMutation((Row)value.f1);
            this.mutator.mutate((Mutation)put);
        } else {
            Delete delete = this.helper.createDeleteMutation((Row)value.f1);
            this.mutator.mutate((Mutation)delete);
        }
        if (this.bufferFlushMaxMutations > 0L && this.numPendingRequests.incrementAndGet() >= this.bufferFlushMaxMutations) {
            this.flush();
        }
    }

    private void flush() throws IOException {
        this.mutator.flush();
        this.numPendingRequests.set(0L);
        this.checkErrorAndRethrow();
    }

    public void close() throws Exception {
        this.closed = true;
        if (this.mutator != null) {
            try {
                this.mutator.close();
            }
            catch (IOException e) {
                LOG.warn("Exception occurs while closing HBase BufferedMutator.", (Throwable)e);
            }
            this.mutator = null;
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (IOException e) {
                LOG.warn("Exception occurs while closing HBase Connection.", (Throwable)e);
            }
            this.connection = null;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        while (this.numPendingRequests.get() != 0L) {
            this.flush();
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
        this.failureThrowable.compareAndSet(null, (Throwable)exception);
    }
}

