package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.class */
public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(ValueStateDescriptor.class, HeapValueState::create), Tuple2.of(ListStateDescriptor.class, HeapListState::create), Tuple2.of(MapStateDescriptor.class, HeapMapState::create), Tuple2.of(AggregatingStateDescriptor.class, HeapAggregatingState::create), Tuple2.of(ReducingStateDescriptor.class, HeapReducingState::create), Tuple2.of(FoldingStateDescriptor.class, HeapFoldingState::create)}).collect(Collectors.toMap(tuple2 -> {
        return (Class) tuple2.f0;
    }, tuple22 -> {
        return (StateFactory) tuple22.f1;
    }));
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
    private final Map<StateUID, StateMetaInfoSnapshot> restoredStateMetaInfo;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final HeapKeyedStateBackend<K>.HeapSnapshotStrategy snapshotStrategy;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$AsyncSnapshotStrategySynchronicityBehavior.class */
    private class AsyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> {
        private AsyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void logOperationCompleted(CheckpointStreamFactory checkpointStreamFactory, long j) {
            HeapKeyedStateBackend.LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{checkpointStreamFactory, Thread.currentThread(), Long.valueOf(System.currentTimeMillis() - j)});
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public boolean isAsynchronous() {
            return true;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> registeredKeyValueStateBackendMetaInfo) {
            return new CopyOnWriteStateTable(HeapKeyedStateBackend.this, registeredKeyValueStateBackendMetaInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$HeapSnapshotStrategy.class */
    public class HeapSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, SnapshotStrategySynchronicityBehavior<K> {
        private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait;

        HeapSnapshotStrategy(SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityBehavior) {
            this.snapshotStrategySynchronicityTrait = snapshotStrategySynchronicityBehavior;
        }

        @Override // org.apache.flink.runtime.state.SnapshotStrategy
        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long j, long j2, final CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
            if (!HeapKeyedStateBackend.this.hasRegisteredState()) {
                return DoneFuture.of(SnapshotResult.empty());
            }
            long currentTimeMillis = System.currentTimeMillis();
            int size = HeapKeyedStateBackend.this.registeredKVStates.size() + HeapKeyedStateBackend.this.registeredPQStates.size();
            Preconditions.checkState(size <= 32767, "Too many states: " + size + ". Currently at most 32767 states are supported");
            List<StateMetaInfoSnapshot> arrayList = new ArrayList<>(size);
            final Map<StateUID, Integer> hashMap = new HashMap<>(size);
            final Map<StateUID, StateSnapshot> hashMap2 = new HashMap<>(size);
            processSnapshotMetaInfoForAllStates(arrayList, hashMap2, hashMap, HeapKeyedStateBackend.this.registeredKVStates, StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
            processSnapshotMetaInfoForAllStates(arrayList, hashMap2, hashMap, HeapKeyedStateBackend.this.registeredPQStates, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
            final KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(HeapKeyedStateBackend.this.keySerializer, arrayList, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, HeapKeyedStateBackend.this.keyGroupCompressionDecorator));
            final SupplierWithException supplierWithException = HeapKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> {
                return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, HeapKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider());
            } : () -> {
                return CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
            };
            AsyncStoppableTaskWithCallback from = AsyncStoppableTaskWithCallback.from(new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { // from class: org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.HeapSnapshotStrategy.1
                CheckpointStreamWithResultProvider streamAndResultExtractor = null;

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void acquireResources() throws Exception {
                    this.streamAndResultExtractor = (CheckpointStreamWithResultProvider) supplierWithException.get();
                    HeapKeyedStateBackend.this.cancelStreamRegistry.registerCloseable(this.streamAndResultExtractor);
                }

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void releaseResources() {
                    unregisterAndCloseStreamAndResultExtractor();
                    Iterator it = hashMap2.values().iterator();
                    while (it.hasNext()) {
                        ((StateSnapshot) it.next()).release();
                    }
                }

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void stopOperation() {
                    unregisterAndCloseStreamAndResultExtractor();
                }

                private void unregisterAndCloseStreamAndResultExtractor() {
                    if (HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                        IOUtils.closeQuietly(this.streamAndResultExtractor);
                        this.streamAndResultExtractor = null;
                    }
                }

                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                /* JADX WARN: Multi-variable type inference failed */
                /* JADX WARN: Type inference failed for: r0v3, types: [java.io.OutputStream, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                @Nonnull
                public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                    long currentTimeMillis2 = System.currentTimeMillis();
                    ?? checkpointOutputStream = this.streamAndResultExtractor.getCheckpointOutputStream();
                    DataOutputView dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper((OutputStream) checkpointOutputStream);
                    keyedBackendSerializationProxy.write(dataOutputViewStreamWrapper);
                    long[] jArr = new long[HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                    for (int i = 0; i < HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); i++) {
                        int keyGroupId = HeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(i);
                        jArr[i] = checkpointOutputStream.getPos();
                        dataOutputViewStreamWrapper.writeInt(keyGroupId);
                        for (Map.Entry entry : hashMap2.entrySet()) {
                            StateSnapshot.StateKeyGroupWriter keyGroupWriter = ((StateSnapshot) entry.getValue()).getKeyGroupWriter();
                            OutputStream decorateWithCompression = HeapKeyedStateBackend.this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream) checkpointOutputStream);
                            Throwable th = null;
                            try {
                                try {
                                    DataOutputViewStreamWrapper dataOutputViewStreamWrapper2 = new DataOutputViewStreamWrapper(decorateWithCompression);
                                    dataOutputViewStreamWrapper2.writeShort(((Integer) hashMap.get(entry.getKey())).intValue());
                                    keyGroupWriter.writeStateInKeyGroup(dataOutputViewStreamWrapper2, keyGroupId);
                                    if (decorateWithCompression != null) {
                                        if (0 != 0) {
                                            try {
                                                decorateWithCompression.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            decorateWithCompression.close();
                                        }
                                    }
                                } catch (Throwable th3) {
                                    if (decorateWithCompression != null) {
                                        if (th != null) {
                                            try {
                                                decorateWithCompression.close();
                                            } catch (Throwable th4) {
                                                th.addSuppressed(th4);
                                            }
                                        } else {
                                            decorateWithCompression.close();
                                        }
                                    }
                                    throw th3;
                                }
                            } finally {
                            }
                        }
                    }
                    if (!HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                        return SnapshotResult.empty();
                    }
                    KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(HeapKeyedStateBackend.this.keyGroupRange, jArr);
                    SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult = this.streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
                    this.streamAndResultExtractor = null;
                    HeapSnapshotStrategy.this.logOperationCompleted(checkpointStreamFactory, currentTimeMillis2);
                    return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(closeAndFinalizeCheckpointStreamResult, keyGroupRangeOffsets);
                }
            });
            finalizeSnapshotBeforeReturnHook(from);
            HeapKeyedStateBackend.LOG.info("Heap backend snapshot (" + checkpointStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
            return from;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            this.snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void logOperationCompleted(CheckpointStreamFactory checkpointStreamFactory, long j) {
            this.snapshotStrategySynchronicityTrait.logOperationCompleted(checkpointStreamFactory, j);
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public boolean isAsynchronous() {
            return this.snapshotStrategySynchronicityTrait.isAsynchronous();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> registeredKeyValueStateBackendMetaInfo) {
            return this.snapshotStrategySynchronicityTrait.newStateTable(registeredKeyValueStateBackendMetaInfo);
        }

        private void processSnapshotMetaInfoForAllStates(List<StateMetaInfoSnapshot> list, Map<StateUID, StateSnapshot> map, Map<StateUID, Integer> map2, Map<String, ? extends StateSnapshotRestore> map3, StateMetaInfoSnapshot.BackendStateType backendStateType) {
            for (Map.Entry<String, ? extends StateSnapshotRestore> entry : map3.entrySet()) {
                StateUID of = StateUID.of(entry.getKey(), backendStateType);
                map2.put(of, Integer.valueOf(map2.size()));
                StateSnapshotRestore value = entry.getValue();
                if (null != value) {
                    StateSnapshot stateSnapshot = value.stateSnapshot();
                    list.add(stateSnapshot.getMetaInfoSnapshot());
                    map.put(of, stateSnapshot);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$SnapshotStrategySynchronicityBehavior.class */
    public interface SnapshotStrategySynchronicityBehavior<K> {
        default void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
        }

        default void logOperationCompleted(CheckpointStreamFactory checkpointStreamFactory, long j) {
        }

        boolean isAsynchronous();

        <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> registeredKeyValueStateBackendMetaInfo);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$StateFactory.class */
    public interface StateFactory {
        /* JADX WARN: Incorrect return type in method signature: <K:Ljava/lang/Object;N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/heap/StateTable<TK;TN;TSV;>;Lorg/apache/flink/api/common/typeutils/TypeSerializer<TK;>;)TIS; */
        State createState(StateDescriptor stateDescriptor, StateTable stateTable, TypeSerializer typeSerializer) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$StateUID.class */
    public static final class StateUID {

        @Nonnull
        private final String stateName;

        @Nonnull
        private final StateMetaInfoSnapshot.BackendStateType stateType;

        StateUID(@Nonnull String str, @Nonnull StateMetaInfoSnapshot.BackendStateType backendStateType) {
            this.stateName = str;
            this.stateType = backendStateType;
        }

        @Nonnull
        public String getStateName() {
            return this.stateName;
        }

        @Nonnull
        public StateMetaInfoSnapshot.BackendStateType getStateType() {
            return this.stateType;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            StateUID stateUID = (StateUID) obj;
            return Objects.equals(getStateName(), stateUID.getStateName()) && getStateType() == stateUID.getStateType();
        }

        public int hashCode() {
            return Objects.hash(getStateName(), getStateType());
        }

        public static StateUID of(@Nonnull String str, @Nonnull StateMetaInfoSnapshot.BackendStateType backendStateType) {
            return new StateUID(str, backendStateType);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend$SyncSnapshotStrategySynchronicityBehavior.class */
    private class SyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> {
        private SyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            runnable.run();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public boolean isAsynchronous() {
            return false;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.SnapshotStrategySynchronicityBehavior
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> registeredKeyValueStateBackendMetaInfo) {
            return new NestedMapsStateTable(HeapKeyedStateBackend.this, registeredKeyValueStateBackendMetaInfo);
        }
    }

    public HeapKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, int i, KeyGroupRange keyGroupRange, boolean z, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, HeapPriorityQueueSetFactory heapPriorityQueueSetFactory, TtlTimeProvider ttlTimeProvider) {
        super(taskKvStateRegistry, typeSerializer, classLoader, i, keyGroupRange, executionConfig, ttlTimeProvider);
        this.registeredKVStates = new HashMap();
        this.registeredPQStates = new HashMap();
        this.localRecoveryConfig = (LocalRecoveryConfig) Preconditions.checkNotNull(localRecoveryConfig);
        this.snapshotStrategy = new HeapSnapshotStrategy(z ? new AsyncSnapshotStrategySynchronicityBehavior() : new SyncSnapshotStrategySynchronicityBehavior());
        LOG.info("Initializing heap keyed state backend with stream factory.");
        this.restoredStateMetaInfo = new HashMap();
        this.priorityQueueSetFactory = heapPriorityQueueSetFactory;
    }

    @Override // org.apache.flink.runtime.state.PriorityQueueSetFactory
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        HeapPriorityQueueSnapshotRestoreWrapper heapPriorityQueueSnapshotRestoreWrapper = this.registeredPQStates.get(str);
        if (heapPriorityQueueSnapshotRestoreWrapper == null) {
            return createInternal(new RegisteredPriorityQueueStateBackendMetaInfo<>(str, typeSerializer));
        }
        StateMetaInfoSnapshot stateMetaInfoSnapshot = this.restoredStateMetaInfo.get(StateUID.of(str, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE));
        Preconditions.checkState(stateMetaInfoSnapshot != null, "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
        StateMetaInfoSnapshot.CommonSerializerKeys commonSerializerKeys = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
        if (CompatibilityUtil.resolveCompatibilityResult(stateMetaInfoSnapshot.getTypeSerializer(commonSerializerKeys), (Class) null, stateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(commonSerializerKeys), typeSerializer).isRequiresMigration()) {
            throw new FlinkRuntimeException(StateMigrationException.notSupported());
        }
        this.registeredPQStates.put(str, heapPriorityQueueSnapshotRestoreWrapper.forUpdatedSerializer(typeSerializer));
        return heapPriorityQueueSnapshotRestoreWrapper.getPriorityQueue();
    }

    @Nonnull
    private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> createInternal(RegisteredPriorityQueueStateBackendMetaInfo<T> registeredPriorityQueueStateBackendMetaInfo) {
        String name = registeredPriorityQueueStateBackendMetaInfo.getName();
        HeapPriorityQueueSet<T> create = this.priorityQueueSetFactory.create(name, (TypeSerializer) registeredPriorityQueueStateBackendMetaInfo.getElementSerializer());
        this.registeredPQStates.put(name, new HeapPriorityQueueSnapshotRestoreWrapper(create, registeredPriorityQueueStateBackendMetaInfo, KeyExtractorFunction.forKeyedObjects(), this.keyGroupRange, this.numberOfKeyGroups));
        return create;
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> typeSerializer, StateDescriptor<?, V> stateDescriptor, StateSnapshotTransformer<V> stateSnapshotTransformer) throws StateMigrationException {
        StateTable<K, ?, ?> stateTable = this.registeredKVStates.get(stateDescriptor.getName());
        if (stateTable != null) {
            StateMetaInfoSnapshot stateMetaInfoSnapshot = this.restoredStateMetaInfo.get(StateUID.of(stateDescriptor.getName(), StateMetaInfoSnapshot.BackendStateType.KEY_VALUE));
            Preconditions.checkState(stateMetaInfoSnapshot != null, "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            stateTable.setMetaInfo(RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(stateMetaInfoSnapshot, typeSerializer, stateDescriptor, stateSnapshotTransformer));
        } else {
            stateTable = this.snapshotStrategy.newStateTable(new RegisteredKeyValueStateBackendMetaInfo<>(stateDescriptor.getType(), stateDescriptor.getName(), typeSerializer, stateDescriptor.getSerializer(), stateSnapshotTransformer));
            this.registeredKVStates.put(stateDescriptor.getName(), stateTable);
        }
        return (StateTable<K, N, V>) stateTable;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N> Stream<K> getKeys(String str, N n) {
        return !this.registeredKVStates.containsKey(str) ? Stream.empty() : this.registeredKVStates.get(str).getKeys(n);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasRegisteredState() {
        return (this.registeredKVStates.isEmpty() && this.registeredPQStates.isEmpty()) ? false : true;
    }

    /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;SEV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/StateSnapshotTransformer$StateSnapshotTransformFactory<TSEV;>;)TIS; */
    @Override // org.apache.flink.runtime.state.KeyedStateFactory
    @Nonnull
    public State createInternalState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory stateSnapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
        if (stateFactory == null) {
            throw new FlinkRuntimeException(String.format("State %s is not supported by %s", stateDescriptor.getClass(), getClass()));
        }
        return stateFactory.createState(stateDescriptor, tryRegisterStateTable(typeSerializer, stateDescriptor, getStateSnapshotTransformer(stateDescriptor, stateSnapshotTransformFactory)), this.keySerializer);
    }

    private <SV, SEV> StateSnapshotTransformer<SV> getStateSnapshotTransformer(StateDescriptor<?, SV> stateDescriptor, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> stateSnapshotTransformFactory) {
        Optional<StateSnapshotTransformer<SEV>> createForDeserializedState = stateSnapshotTransformFactory.createForDeserializedState();
        if (createForDeserializedState.isPresent()) {
            return stateDescriptor instanceof ListStateDescriptor ? new StateSnapshotTransformer.ListStateSnapshotTransformer(createForDeserializedState.get()) : stateDescriptor instanceof MapStateDescriptor ? new StateSnapshotTransformer.MapStateSnapshotTransformer(createForDeserializedState.get()) : createForDeserializedState.get();
        }
        return null;
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
        return this.snapshotStrategy.performSnapshot(j, j2, checkpointStreamFactory, checkpointOptions);
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<KeyedStateHandle> collection) throws Exception {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", collection);
        }
        restorePartitionedState(collection);
    }

    private void restorePartitionedState(Collection<KeyedStateHandle> collection) throws Exception {
        HashMap hashMap = new HashMap();
        this.registeredKVStates.clear();
        this.registeredPQStates.clear();
        boolean z = false;
        for (KeyedStateHandle keyedStateHandle : collection) {
            if (keyedStateHandle != null) {
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
                }
                KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
                FSDataInputStream openInputStream = keyGroupsStateHandle.openInputStream();
                this.cancelStreamRegistry.registerCloseable(openInputStream);
                try {
                    DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                    KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader, true);
                    keyedBackendSerializationProxy.read(dataInputViewStreamWrapper);
                    if (!z) {
                        if (CompatibilityUtil.resolveCompatibilityResult(keyedBackendSerializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, keyedBackendSerializationProxy.getKeySerializerConfigSnapshot(), this.keySerializer).isRequiresMigration()) {
                            throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
                        }
                        z = true;
                    }
                    List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = keyedBackendSerializationProxy.getStateMetaInfoSnapshots();
                    createOrCheckStateForMetaInfo(stateMetaInfoSnapshots, hashMap);
                    readStateHandleStateData(openInputStream, dataInputViewStreamWrapper, keyGroupsStateHandle.getGroupRangeOffsets(), hashMap, stateMetaInfoSnapshots.size(), keyedBackendSerializationProxy.getReadVersion(), keyedBackendSerializationProxy.isUsingKeyGroupCompression());
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                } catch (Throwable th) {
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                    throw th;
                }
            }
        }
    }

    private void readStateHandleStateData(FSDataInputStream fSDataInputStream, DataInputViewStreamWrapper dataInputViewStreamWrapper, KeyGroupRangeOffsets keyGroupRangeOffsets, Map<Integer, StateMetaInfoSnapshot> map, int i, int i2, boolean z) throws IOException {
        StreamCompressionDecorator streamCompressionDecorator = z ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        Iterator<Tuple2<Integer, Long>> it = keyGroupRangeOffsets.iterator();
        while (it.hasNext()) {
            Tuple2<Integer, Long> next = it.next();
            int intValue = ((Integer) next.f0).intValue();
            long longValue = ((Long) next.f1).longValue();
            Preconditions.checkState(this.keyGroupRange.contains(intValue), "The key group must belong to the backend.");
            fSDataInputStream.seek(longValue);
            Preconditions.checkState(dataInputViewStreamWrapper.readInt() == intValue, "Unexpected key-group in restore.");
            InputStream decorateWithCompression = streamCompressionDecorator.decorateWithCompression((InputStream) fSDataInputStream);
            Throwable th = null;
            try {
                try {
                    readKeyGroupStateData(decorateWithCompression, map, intValue, i, i2);
                    if (decorateWithCompression != null) {
                        if (0 != 0) {
                            try {
                                decorateWithCompression.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            decorateWithCompression.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (decorateWithCompression != null) {
                    if (th != null) {
                        try {
                            decorateWithCompression.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        decorateWithCompression.close();
                    }
                }
                throw th3;
            }
        }
    }

    private void readKeyGroupStateData(InputStream inputStream, Map<Integer, StateMetaInfoSnapshot> map, int i, int i2, int i3) throws IOException {
        StateTable<K, ?, ?> stateTable;
        DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(inputStream);
        for (int i4 = 0; i4 < i2; i4++) {
            StateMetaInfoSnapshot stateMetaInfoSnapshot = map.get(Integer.valueOf(dataInputViewStreamWrapper.readShort()));
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE:
                    stateTable = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    break;
                case PRIORITY_QUEUE:
                    stateTable = (StateTable<K, ?, ?>) this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    break;
                default:
                    throw new IllegalStateException("Unexpected state type: " + stateMetaInfoSnapshot.getBackendStateType() + ScopeFormat.SCOPE_SEPARATOR);
            }
            stateTable.keyGroupReader(i3).readMappingsInKeyGroup(dataInputViewStreamWrapper, i);
        }
    }

    private void createOrCheckStateForMetaInfo(List<StateMetaInfoSnapshot> list, Map<Integer, StateMetaInfoSnapshot> map) {
        HeapPriorityQueueSnapshotRestoreWrapper heapPriorityQueueSnapshotRestoreWrapper;
        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : list) {
            this.restoredStateMetaInfo.put(StateUID.of(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot.getBackendStateType()), stateMetaInfoSnapshot);
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE:
                    heapPriorityQueueSnapshotRestoreWrapper = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    if (heapPriorityQueueSnapshotRestoreWrapper == null) {
                        this.registeredKVStates.put(stateMetaInfoSnapshot.getName(), this.snapshotStrategy.newStateTable(new RegisteredKeyValueStateBackendMetaInfo(stateMetaInfoSnapshot)));
                        break;
                    }
                    break;
                case PRIORITY_QUEUE:
                    heapPriorityQueueSnapshotRestoreWrapper = this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    if (heapPriorityQueueSnapshotRestoreWrapper == null) {
                        createInternal(new RegisteredPriorityQueueStateBackendMetaInfo<>(stateMetaInfoSnapshot));
                        break;
                    }
                    break;
                default:
                    throw new IllegalStateException("Unexpected state type: " + stateMetaInfoSnapshot.getBackendStateType() + ScopeFormat.SCOPE_SEPARATOR);
            }
            if (heapPriorityQueueSnapshotRestoreWrapper == null) {
                map.put(Integer.valueOf(map.size()), stateMetaInfoSnapshot);
            }
        }
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend, org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State, T> void applyToAllKeys(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction) throws Exception {
        Stream keys = getKeys(stateDescriptor.getName(), n);
        Throwable th = null;
        try {
            try {
                List list = (List) keys.collect(Collectors.toList());
                State partitionedState = getPartitionedState(n, typeSerializer, stateDescriptor);
                for (Object obj : list) {
                    setCurrentKey(obj);
                    keyedStateFunction.process(obj, partitionedState);
                }
                if (keys != null) {
                    if (0 == 0) {
                        keys.close();
                        return;
                    }
                    try {
                        keys.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (keys != null) {
                if (th != null) {
                    try {
                        keys.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    keys.close();
                }
            }
            throw th4;
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    @VisibleForTesting
    public int numKeyValueStateEntries() {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.registeredKVStates.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries(Object obj) {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.registeredKVStates.values().iterator();
        while (it.hasNext()) {
            i += it.next().sizeOfNamespace(obj);
        }
        return i;
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public boolean supportsAsynchronousSnapshots() {
        return this.snapshotStrategy.isAsynchronous();
    }

    @VisibleForTesting
    public LocalRecoveryConfig getLocalRecoveryConfig() {
        return this.localRecoveryConfig;
    }
}
