/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import java.util.function.Consumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.function.FunctionCatalogWrapper;
import org.springframework.cloud.stream.function.FunctionInvoker;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class IntegrationFlowFunctionSupport {
    private final FunctionCatalogWrapper functionCatalog;
    private final FunctionInspector functionInspector;
    private final CompositeMessageConverterFactory messageConverterFactory;
    private final StreamFunctionProperties functionProperties;
    private final BindingServiceProperties bindingServiceProperties;
    @Autowired
    private MessageChannel errorChannel;

    IntegrationFlowFunctionSupport(FunctionCatalogWrapper functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory messageConverterFactory, StreamFunctionProperties functionProperties, BindingServiceProperties bindingServiceProperties) {
        Assert.notNull((Object)functionCatalog, (String)"'functionCatalog' must not be null");
        Assert.notNull((Object)functionInspector, (String)"'functionInspector' must not be null");
        Assert.notNull((Object)messageConverterFactory, (String)"'messageConverterFactory' must not be null");
        Assert.notNull((Object)functionProperties, (String)"'functionProperties' must not be null");
        this.functionCatalog = functionCatalog;
        this.functionInspector = functionInspector;
        this.messageConverterFactory = messageConverterFactory;
        this.functionProperties = functionProperties;
        this.bindingServiceProperties = bindingServiceProperties;
    }

    public <T> boolean containsFunction(Class<T> typeOfFunction) {
        return StringUtils.hasText((String)this.functionProperties.getDefinition()) && this.functionCatalog.contains(typeOfFunction, this.functionProperties.getDefinition());
    }

    public <T> boolean containsFunction(Class<T> typeOfFunction, String functionName) {
        return StringUtils.hasText((String)functionName) && this.functionCatalog.contains(typeOfFunction, functionName);
    }

    public FunctionType getCurrentFunctionType() {
        FunctionType functionType = this.functionInspector.getRegistration(this.functionCatalog.lookup(this.functionProperties.getDefinition())).getType();
        return functionType;
    }

    public IntegrationFlowBuilder integrationFlowFromNamedSupplier() {
        if (StringUtils.hasText((String)this.functionProperties.getDefinition())) {
            Supplier supplier = this.functionCatalog.lookup(Supplier.class, this.functionProperties.getDefinition());
            if (supplier instanceof FluxSupplier) {
                supplier = ((FluxSupplier)supplier).getTarget();
            }
            return (IntegrationFlowBuilder)this.integrationFlowFromProvidedSupplier(supplier).split();
        }
        throw new IllegalStateException("A Supplier is not specified in the 'spring.cloud.stream.function.definition' property.");
    }

    public IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier) {
        return IntegrationFlows.from(supplier);
    }

    public <O> IntegrationFlowBuilder integrationFlowFromChannel(SubscribableChannel inputChannel) {
        IntegrationFlowBuilder flowBuilder = (IntegrationFlowBuilder)IntegrationFlows.from((MessageChannel)inputChannel).bridge();
        return flowBuilder;
    }

    public <O> IntegrationFlowBuilder integrationFlowForFunction(SubscribableChannel inputChannel, MessageChannel outputChannel) {
        IntegrationFlowBuilder flowBuilder;
        String outputBindingName;
        String inputBindingName;
        if (inputChannel instanceof IntegrationObjectSupport && StringUtils.hasText((String)(inputBindingName = ((IntegrationObjectSupport)inputChannel).getComponentName()))) {
            this.functionProperties.setConsumerProperties(this.bindingServiceProperties.getConsumerProperties(inputBindingName));
        }
        if (outputChannel instanceof IntegrationObjectSupport && StringUtils.hasText((String)(outputBindingName = ((IntegrationObjectSupport)outputChannel).getComponentName()))) {
            this.functionProperties.setProducerProperties(this.bindingServiceProperties.getProducerProperties(outputBindingName));
        }
        if (!this.andThenFunction(flowBuilder = (IntegrationFlowBuilder)IntegrationFlows.from((MessageChannel)inputChannel).bridge(), outputChannel, this.functionProperties)) {
            flowBuilder = (IntegrationFlowBuilder)flowBuilder.channel(outputChannel);
        }
        return flowBuilder;
    }

    public <I, O> boolean andThenFunction(IntegrationFlowBuilder flowBuilder, MessageChannel outputChannel, StreamFunctionProperties functionProperties) {
        return this.andThenFunction(flowBuilder.toReactivePublisher(), outputChannel, functionProperties);
    }

    public <I, O> boolean andThenFunction(Publisher<?> publisher, MessageChannel outputChannel, StreamFunctionProperties functionProperties) {
        if (!StringUtils.hasText((String)functionProperties.getDefinition())) {
            return false;
        }
        FunctionInvoker functionInvoker = new FunctionInvoker(functionProperties, this.functionCatalog, this.functionInspector, this.messageConverterFactory, this.errorChannel);
        if (outputChannel != null) {
            this.subscribeToInput(functionInvoker, publisher, arg_0 -> ((MessageChannel)outputChannel).send(arg_0));
        } else {
            this.subscribeToInput(functionInvoker, publisher, null);
        }
        return true;
    }

    private <O> Mono<Void> subscribeToOutput(Consumer<Message<O>> outputProcessor, Publisher<Message<O>> outputPublisher) {
        Flux output = outputProcessor == null ? Flux.from(outputPublisher) : Flux.from(outputPublisher).doOnNext(outputProcessor);
        return output.then();
    }

    private <I, O> void subscribeToInput(FunctionInvoker<I, O> functionInvoker, Publisher<?> publisher, Consumer<Message<O>> outputProcessor) {
        Flux inputPublisher = Flux.from(publisher);
        this.subscribeToOutput(outputProcessor, (Publisher<Message<O>>)functionInvoker.apply(inputPublisher)).subscribe();
    }
}

