package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend.class */
public class RocksDBStateBackend extends AbstractStateBackend implements ConfigurableStateBackend {
    private static final long serialVersionUID = 1;
    private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
    private static final int UNDEFINED_NUMBER_OF_TRANSFERING_THREADS = -1;
    private final StateBackend checkpointStreamBackend;

    @Nullable
    private File[] localRocksDbDirectories;

    @Nullable
    private PredefinedOptions predefinedOptions;

    @Nullable
    private OptionsFactory optionsFactory;
    private final TernaryBoolean enableIncrementalCheckpointing;
    private int numberOfTransferingThreads;
    private TernaryBoolean enableTtlCompactionFilter;
    private final PriorityQueueStateType priorityQueueStateType;
    private final RocksDBNativeMetricOptions defaultMetricOptions;
    private transient File[] initializedDbBasePaths;
    private transient JobID jobId;
    private transient int nextDirectory;
    private transient boolean isInitialized;
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
    private static boolean rocksDbInitialized = false;

    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackend$PriorityQueueStateType.class */
    public enum PriorityQueueStateType {
        HEAP,
        ROCKSDB
    }

    public RocksDBStateBackend(String str) throws IOException {
        this(new Path(str).toUri());
    }

    public RocksDBStateBackend(String str, boolean z) throws IOException {
        this(new Path(str).toUri(), z);
    }

    public RocksDBStateBackend(URI uri) throws IOException {
        this((AbstractStateBackend) new FsStateBackend(uri));
    }

    public RocksDBStateBackend(URI uri, boolean z) throws IOException {
        this((AbstractStateBackend) new FsStateBackend(uri), z);
    }

    public RocksDBStateBackend(StateBackend stateBackend) {
        this(stateBackend, TernaryBoolean.UNDEFINED);
    }

    public RocksDBStateBackend(StateBackend stateBackend, TernaryBoolean ternaryBoolean) {
        this.checkpointStreamBackend = (StateBackend) Preconditions.checkNotNull(stateBackend);
        this.enableIncrementalCheckpointing = ternaryBoolean;
        this.numberOfTransferingThreads = UNDEFINED_NUMBER_OF_TRANSFERING_THREADS;
        this.priorityQueueStateType = PriorityQueueStateType.HEAP;
        this.defaultMetricOptions = new RocksDBNativeMetricOptions();
        this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
    }

    @Deprecated
    public RocksDBStateBackend(AbstractStateBackend abstractStateBackend) {
        this((StateBackend) abstractStateBackend, TernaryBoolean.UNDEFINED);
    }

    @Deprecated
    public RocksDBStateBackend(AbstractStateBackend abstractStateBackend, boolean z) {
        this((StateBackend) abstractStateBackend, TernaryBoolean.fromBoolean(z));
    }

    private RocksDBStateBackend(RocksDBStateBackend rocksDBStateBackend, Configuration configuration, ClassLoader classLoader) {
        StateBackend stateBackend = rocksDBStateBackend.checkpointStreamBackend;
        this.checkpointStreamBackend = stateBackend instanceof ConfigurableStateBackend ? ((ConfigurableStateBackend) stateBackend).configure(configuration, classLoader) : stateBackend;
        this.enableIncrementalCheckpointing = rocksDBStateBackend.enableIncrementalCheckpointing.resolveUndefined(configuration.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
        if (rocksDBStateBackend.numberOfTransferingThreads == UNDEFINED_NUMBER_OF_TRANSFERING_THREADS) {
            this.numberOfTransferingThreads = configuration.getInteger(RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM);
        } else {
            this.numberOfTransferingThreads = rocksDBStateBackend.numberOfTransferingThreads;
        }
        this.enableTtlCompactionFilter = rocksDBStateBackend.enableTtlCompactionFilter.resolveUndefined(configuration.getBoolean(RocksDBOptions.TTL_COMPACT_FILTER_ENABLED));
        String string = configuration.getString(RocksDBOptions.TIMER_SERVICE_FACTORY);
        this.priorityQueueStateType = string.length() > 0 ? PriorityQueueStateType.valueOf(string.toUpperCase()) : rocksDBStateBackend.priorityQueueStateType;
        if (rocksDBStateBackend.localRocksDbDirectories != null) {
            this.localRocksDbDirectories = rocksDBStateBackend.localRocksDbDirectories;
        } else {
            String string2 = configuration.getString(RocksDBOptions.LOCAL_DIRECTORIES);
            if (string2 != null) {
                try {
                    setDbStoragePaths(string2.split(",|" + File.pathSeparator));
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException("Invalid configuration for RocksDB state backend's local storage directories: " + e.getMessage(), e);
                }
            }
        }
        this.defaultMetricOptions = RocksDBNativeMetricOptions.fromConfig(configuration);
        this.predefinedOptions = rocksDBStateBackend.predefinedOptions == null ? PredefinedOptions.valueOf(configuration.getString(RocksDBOptions.PREDEFINED_OPTIONS)) : rocksDBStateBackend.predefinedOptions;
        LOG.info("Using predefined options: {}.", this.predefinedOptions.name());
        try {
            this.optionsFactory = configureOptionsFactory(rocksDBStateBackend.optionsFactory, configuration.getString(RocksDBOptions.OPTIONS_FACTORY), configuration, classLoader);
        } catch (DynamicCodeLoadingException e2) {
            throw new FlinkRuntimeException(e2);
        }
    }

    /* renamed from: configure, reason: merged with bridge method [inline-methods] */
    public RocksDBStateBackend m26configure(Configuration configuration, ClassLoader classLoader) {
        return new RocksDBStateBackend(this, configuration, classLoader);
    }

    public StateBackend getCheckpointBackend() {
        return this.checkpointStreamBackend;
    }

    private void lazyInitializeForJob(Environment environment, String str) throws IOException {
        if (this.isInitialized) {
            return;
        }
        this.jobId = environment.getJobID();
        if (this.localRocksDbDirectories == null) {
            this.initializedDbBasePaths = environment.getIOManager().getSpillingDirectories();
        } else {
            ArrayList arrayList = new ArrayList(this.localRocksDbDirectories.length);
            StringBuilder sb = new StringBuilder();
            for (File file : this.localRocksDbDirectories) {
                File file2 = new File(file, UUID.randomUUID().toString());
                if (file2.mkdirs()) {
                    arrayList.add(file);
                } else {
                    String str2 = "Local DB files directory '" + file + "' does not exist and cannot be created. ";
                    LOG.error(str2);
                    sb.append(str2);
                }
                file2.delete();
            }
            if (arrayList.isEmpty()) {
                throw new IOException("No local storage directories available. " + ((Object) sb));
            }
            this.initializedDbBasePaths = (File[]) arrayList.toArray(new File[arrayList.size()]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
        this.isInitialized = true;
    }

    private File getNextStoragePath() {
        int i = this.nextDirectory + 1;
        int i2 = i >= this.initializedDbBasePaths.length ? 0 : i;
        this.nextDirectory = i2;
        return this.initializedDbBasePaths[i2];
    }

    public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
        return this.checkpointStreamBackend.resolveCheckpoint(str);
    }

    public CheckpointStorage createCheckpointStorage(JobID jobID) throws IOException {
        return this.checkpointStreamBackend.createCheckpointStorage(jobID);
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) throws IOException {
        ensureRocksDBIsLoaded(environment.getTaskManagerInfo().getTmpDirectories()[0]);
        String replaceAll = str.replaceAll("[^a-zA-Z0-9\\-]", "_");
        lazyInitializeForJob(environment, replaceAll);
        File file = new File(getNextStoragePath(), "job_" + this.jobId + "_op_" + replaceAll + "_uuid_" + UUID.randomUUID());
        LocalRecoveryConfig createLocalRecoveryConfig = environment.getTaskStateManager().createLocalRecoveryConfig();
        ExecutionConfig executionConfig = environment.getExecutionConfig();
        return new RocksDBKeyedStateBackendBuilder(str, environment.getUserClassLoader(), file, getDbOptions(), str2 -> {
            return getColumnOptions();
        }, taskKvStateRegistry, typeSerializer, i, keyGroupRange, executionConfig, createLocalRecoveryConfig, this.priorityQueueStateType, ttlTimeProvider, metricGroup, collection, getCompressionDecorator(executionConfig), closeableRegistry).setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()).setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled()).setNumberOfTransferingThreads(getNumberOfTransferingThreads()).setNativeMetricOptions(getMemoryWatcherOptions()).m12build();
    }

    public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
        return new DefaultOperatorStateBackendBuilder(environment.getUserClassLoader(), environment.getExecutionConfig(), true, collection, closeableRegistry).build();
    }

    private OptionsFactory configureOptionsFactory(@Nullable OptionsFactory optionsFactory, String str, Configuration configuration, ClassLoader classLoader) throws DynamicCodeLoadingException {
        if (optionsFactory != null) {
            if (optionsFactory instanceof ConfigurableOptionsFactory) {
                optionsFactory = ((ConfigurableOptionsFactory) optionsFactory).configure(configuration);
            }
            LOG.info("Using application-defined options factory: {}.", optionsFactory);
            return optionsFactory;
        }
        if (str.equalsIgnoreCase(DefaultConfigurableOptionsFactory.class.getName())) {
            DefaultConfigurableOptionsFactory defaultConfigurableOptionsFactory = new DefaultConfigurableOptionsFactory();
            defaultConfigurableOptionsFactory.configure(configuration);
            LOG.info("Using default options factory: {}.", defaultConfigurableOptionsFactory);
            return defaultConfigurableOptionsFactory;
        }
        try {
            OptionsFactory optionsFactory2 = (OptionsFactory) Class.forName(str, false, classLoader).asSubclass(OptionsFactory.class).newInstance();
            if (optionsFactory2 instanceof ConfigurableOptionsFactory) {
                optionsFactory2 = ((ConfigurableOptionsFactory) optionsFactory2).configure(configuration);
            }
            LOG.info("Using configured options factory: {}.", optionsFactory2);
            return optionsFactory2;
        } catch (ClassCastException | IllegalAccessException | InstantiationException e) {
            throw new DynamicCodeLoadingException("The class configured under '" + RocksDBOptions.OPTIONS_FACTORY.key() + "' is not a valid options factory (" + str + ')', e);
        } catch (ClassNotFoundException e2) {
            throw new DynamicCodeLoadingException("Cannot find configured options factory class: " + str, e2);
        }
    }

    public void setDbStoragePath(String str) {
        setDbStoragePaths(str == null ? null : new String[]{str});
    }

    public void setDbStoragePaths(String... strArr) {
        String str;
        if (strArr == null) {
            this.localRocksDbDirectories = null;
            return;
        }
        if (strArr.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] fileArr = new File[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            String str2 = strArr[i];
            if (str2 == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(str2).toUri();
            } catch (Exception e) {
            }
            if (uri == null || uri.getScheme() == null) {
                str = str2;
            } else {
                if (!"file".equalsIgnoreCase(uri.getScheme())) {
                    throw new IllegalArgumentException("Path " + str2 + " has a non-local scheme");
                }
                str = uri.getPath();
            }
            fileArr[i] = new File(str);
            if (!fileArr[i].isAbsolute()) {
                throw new IllegalArgumentException("Relative paths are not supported");
            }
        }
        this.localRocksDbDirectories = fileArr;
    }

    public String[] getDbStoragePaths() {
        if (this.localRocksDbDirectories == null) {
            return null;
        }
        String[] strArr = new String[this.localRocksDbDirectories.length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = this.localRocksDbDirectories[i].toString();
        }
        return strArr;
    }

    public boolean isIncrementalCheckpointsEnabled() {
        return this.enableIncrementalCheckpointing.getOrDefault(((Boolean) CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()).booleanValue());
    }

    public boolean isTtlCompactionFilterEnabled() {
        return this.enableTtlCompactionFilter.getOrDefault(((Boolean) RocksDBOptions.TTL_COMPACT_FILTER_ENABLED.defaultValue()).booleanValue());
    }

    public void enableTtlCompactionFilter() {
        this.enableTtlCompactionFilter = TernaryBoolean.TRUE;
    }

    public void setPredefinedOptions(PredefinedOptions predefinedOptions) {
        this.predefinedOptions = (PredefinedOptions) Preconditions.checkNotNull(predefinedOptions);
    }

    public PredefinedOptions getPredefinedOptions() {
        if (this.predefinedOptions == null) {
            this.predefinedOptions = PredefinedOptions.DEFAULT;
        }
        return this.predefinedOptions;
    }

    public void setOptions(OptionsFactory optionsFactory) {
        this.optionsFactory = optionsFactory;
    }

    public OptionsFactory getOptions() {
        return this.optionsFactory;
    }

    public DBOptions getDbOptions() {
        DBOptions createDBOptions = getPredefinedOptions().createDBOptions();
        if (this.optionsFactory != null) {
            createDBOptions = this.optionsFactory.createDBOptions(createDBOptions);
        }
        return createDBOptions.setCreateIfMissing(true);
    }

    public ColumnFamilyOptions getColumnOptions() {
        ColumnFamilyOptions createColumnOptions = getPredefinedOptions().createColumnOptions();
        if (this.optionsFactory != null) {
            createColumnOptions = this.optionsFactory.createColumnOptions(createColumnOptions);
        }
        return createColumnOptions;
    }

    public RocksDBNativeMetricOptions getMemoryWatcherOptions() {
        RocksDBNativeMetricOptions rocksDBNativeMetricOptions = this.defaultMetricOptions;
        if (this.optionsFactory != null) {
            rocksDBNativeMetricOptions = this.optionsFactory.createNativeMetricsOptions(rocksDBNativeMetricOptions);
        }
        return rocksDBNativeMetricOptions;
    }

    public int getNumberOfTransferingThreads() {
        return this.numberOfTransferingThreads == UNDEFINED_NUMBER_OF_TRANSFERING_THREADS ? ((Integer) RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()).intValue() : this.numberOfTransferingThreads;
    }

    public void setNumberOfTransferingThreads(int i) {
        Preconditions.checkArgument(i > 0, "The number of threads used to transfer files in RocksDBStateBackend should be greater than zero.");
        this.numberOfTransferingThreads = i;
    }

    public String toString() {
        return "RocksDBStateBackend{checkpointStreamBackend=" + this.checkpointStreamBackend + ", localRocksDbDirectories=" + Arrays.toString(this.localRocksDbDirectories) + ", enableIncrementalCheckpointing=" + this.enableIncrementalCheckpointing + ", numberOfTransferingThreads=" + this.numberOfTransferingThreads + '}';
    }

    private void ensureRocksDBIsLoaded(String str) throws IOException {
        synchronized (RocksDBStateBackend.class) {
            if (rocksDbInitialized) {
                return;
            }
            File absoluteFile = new File(str).getAbsoluteFile();
            LOG.info("Attempting to load RocksDB native library and store it under '{}'", absoluteFile);
            Throwable th = null;
            for (int i = 1; i <= ROCKSDB_LIB_LOADING_ATTEMPTS; i++) {
                try {
                    File file = new File(absoluteFile, "rocksdb-lib-" + new AbstractID());
                    LOG.debug("Attempting to create RocksDB native library folder {}", file);
                    file.mkdirs();
                    NativeLibraryLoader.getInstance().loadLibrary(file.getAbsolutePath());
                    RocksDB.loadLibrary();
                    LOG.info("Successfully loaded RocksDB native library");
                    rocksDbInitialized = true;
                    return;
                } catch (Throwable th2) {
                    th = th2;
                    LOG.debug("RocksDB JNI library loading attempt {} failed", Integer.valueOf(i), th2);
                    try {
                        resetRocksDBLoadedFlag();
                    } catch (Throwable th3) {
                        LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", th3);
                    }
                }
            }
            throw new IOException("Could not load the native RocksDB library", th);
        }
    }

    @VisibleForTesting
    static void resetRocksDBLoadedFlag() throws Exception {
        Field declaredField = NativeLibraryLoader.class.getDeclaredField("initialized");
        declaredField.setAccessible(true);
        declaredField.setBoolean(null, false);
    }
}
