/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.AbstractHeapMergingState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.util.Preconditions;

public class HeapAggregatingState<K, N, IN, ACC, OUT>
extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
implements InternalAggregatingState<N, IN, OUT> {
    private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;

    public HeapAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc, StateTable<K, N, ACC> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) {
        super(stateDesc, stateTable, keySerializer, namespaceSerializer);
        this.aggregateTransformation = new AggregateTransformation(stateDesc.getAggregateFunction());
    }

    public OUT get() {
        Object accumulator = this.stateTable.get(this.currentNamespace);
        return (OUT)(accumulator != null ? ((AggregateTransformation)this.aggregateTransformation).aggFunction.getResult(accumulator) : null);
    }

    public void add(IN value) throws IOException {
        Object namespace = this.currentNamespace;
        if (value == null) {
            this.clear();
            return;
        }
        try {
            this.stateTable.transform(namespace, value, this.aggregateTransformation);
        }
        catch (Exception e) {
            throw new IOException("Exception while applying AggregateFunction in aggregating state", e);
        }
    }

    @Override
    protected ACC mergeState(ACC a, ACC b) throws Exception {
        return (ACC)((AggregateTransformation)this.aggregateTransformation).aggFunction.merge(a, b);
    }

    static final class AggregateTransformation<IN, ACC, OUT>
    implements StateTransformationFunction<ACC, IN> {
        private final AggregateFunction<IN, ACC, OUT> aggFunction;

        AggregateTransformation(AggregateFunction<IN, ACC, OUT> aggFunction) {
            this.aggFunction = (AggregateFunction)Preconditions.checkNotNull(aggFunction);
        }

        @Override
        public ACC apply(ACC accumulator, IN value) throws Exception {
            if (accumulator == null) {
                accumulator = this.aggFunction.createAccumulator();
            }
            return (ACC)this.aggFunction.add(value, accumulator);
        }
    }
}

