/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.flink.util.OptionalConsumer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public abstract class RedirectHandler<T extends RestfulGateway>
extends SimpleChannelInboundHandler<Routed> {
    protected final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    protected final CompletableFuture<String> localAddressFuture;
    protected final GatewayRetriever<? extends T> leaderRetriever;
    protected final Time timeout;
    private String localAddress;

    protected RedirectHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends T> leaderRetriever, @Nonnull Time timeout) {
        this.localAddressFuture = (CompletableFuture)Preconditions.checkNotNull(localAddressFuture);
        this.leaderRetriever = (GatewayRetriever)Preconditions.checkNotNull(leaderRetriever);
        this.timeout = (Time)Preconditions.checkNotNull((Object)timeout);
        this.localAddress = null;
    }

    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Routed routed) throws Exception {
        if (this.localAddressFuture.isDone()) {
            if (this.localAddress == null) {
                try {
                    this.localAddress = this.localAddressFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    this.logger.error("Could not obtain local address.", (Throwable)e);
                    HandlerUtils.sendErrorResponse(channelHandlerContext, routed.request(), new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    return;
                }
            }
            try {
                OptionalConsumer optLeaderConsumer = OptionalConsumer.of(this.leaderRetriever.getNow());
                optLeaderConsumer.ifPresent(gateway -> {
                    CompletableFuture<Optional<String>> optRedirectAddressFuture = HandlerRedirectUtils.getRedirectAddress(this.localAddress, gateway, this.timeout);
                    ReferenceCountUtil.retain((Object)routed);
                    optRedirectAddressFuture.whenCompleteAsync((optRedirectAddress, throwable) -> {
                        try {
                            if (throwable != null) {
                                this.logger.error("Could not retrieve the redirect address.", throwable);
                                HandlerUtils.sendErrorResponse(channelHandlerContext, routed.request(), new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                            } else if (optRedirectAddress.isPresent()) {
                                HttpResponse response = HandlerRedirectUtils.getRedirectResponse((String)optRedirectAddress.get(), routed.path());
                                KeepAliveWrite.flush((ChannelHandlerContext)channelHandlerContext, (HttpRequest)routed.request(), (HttpResponse)response);
                            } else {
                                try {
                                    this.respondAsLeader(channelHandlerContext, routed, gateway);
                                }
                                catch (Exception e) {
                                    this.logger.error("Error while responding as leader.", (Throwable)e);
                                    HandlerUtils.sendErrorResponse(channelHandlerContext, routed.request(), new ErrorResponseBody("Error while responding to the request."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                                }
                            }
                        }
                        finally {
                            ReferenceCountUtil.release((Object)routed);
                        }
                    }, (Executor)channelHandlerContext.executor());
                }).ifNotPresent(() -> HandlerUtils.sendErrorResponse(channelHandlerContext, routed.request(), new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."), HttpResponseStatus.SERVICE_UNAVAILABLE));
            }
            catch (Throwable throwable) {
                this.logger.warn("Error occurred while processing web request.", throwable);
                HandlerUtils.sendErrorResponse(channelHandlerContext, routed.request(), new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'), HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }
        } else {
            HandlerUtils.sendErrorResponse(channelHandlerContext, routed.request(), new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    }

    protected abstract void respondAsLeader(ChannelHandlerContext var1, Routed var2, T var3) throws Exception;
}

