/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client.api.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.map.ObjectMapper;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class TimelineClientImpl
extends TimelineClient {
    private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
    private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
    private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
    private static final Joiner JOINER = Joiner.on((String)"");
    public static final int DEFAULT_SOCKET_TIMEOUT = 60000;
    private static Options opts = new Options();
    private static final String ENTITY_DATA_TYPE = "entity";
    private static final String DOMAIN_DATA_TYPE = "domain";
    private Client client;
    private ConnectionConfigurator connConfigurator;
    private DelegationTokenAuthenticator authenticator;
    private DelegationTokenAuthenticatedURL.Token token;
    private UserGroupInformation authUgi;
    private String doAsUser;
    private Configuration configuration;
    private float timelineServiceVersion;
    private TimelineWriter timelineWriter;
    private SSLFactory sslFactory;
    private volatile String timelineServiceAddress;
    private int maxServiceRetries;
    private long serviceRetryInterval;
    private boolean timelineServiceV2 = false;
    @InterfaceAudience.Private
    @VisibleForTesting
    TimelineClientConnectionRetry connectionRetry;
    private TimelineEntityDispatcher entityDispatcher;
    private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR;

    public TimelineClientImpl() {
        super(TimelineClientImpl.class.getName(), null);
    }

    public TimelineClientImpl(ApplicationId applicationId) {
        super(TimelineClientImpl.class.getName(), applicationId);
        this.timelineServiceV2 = true;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.configuration = conf;
        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
        UserGroupInformation realUgi = ugi.getRealUser();
        if (realUgi != null) {
            this.authUgi = realUgi;
            this.doAsUser = ugi.getShortUserName();
        } else {
            this.authUgi = ugi;
            this.doAsUser = null;
        }
        DefaultClientConfig cc = new DefaultClientConfig();
        cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
        this.connConfigurator = this.initConnConfigurator(conf);
        this.authenticator = UserGroupInformation.isSecurityEnabled() ? new KerberosDelegationTokenAuthenticator() : new PseudoDelegationTokenAuthenticator();
        this.authenticator.setConnectionConfigurator(this.connConfigurator);
        this.token = new DelegationTokenAuthenticatedURL.Token();
        this.connectionRetry = new TimelineClientConnectionRetry(conf);
        this.client = new Client((ClientHandler)new URLConnectionClientHandler((HttpURLConnectionFactory)new TimelineURLConnectionFactory()), (ClientConfig)cc);
        TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
        if (!this.timelineServiceV2) {
            this.client.addFilter((ClientFilter)retryFilter);
        }
        if (this.timelineServiceV2) {
            this.maxServiceRetries = conf.getInt("yarn.timeline-service.client.max-retries", 30);
            this.serviceRetryInterval = conf.getLong("yarn.timeline-service.client.retry-interval-ms", 1000L);
            this.entityDispatcher = new TimelineEntityDispatcher(conf);
        } else {
            if (YarnConfiguration.useHttps((Configuration)conf)) {
                this.setTimelineServiceAddress(conf.get("yarn.timeline-service.webapp.https.address", "0.0.0.0:8190"));
            } else {
                this.setTimelineServiceAddress(conf.get("yarn.timeline-service.webapp.address", "0.0.0.0:8188"));
            }
            this.timelineServiceVersion = conf.getFloat("yarn.timeline-service.version", 1.0f);
            LOG.info((Object)("Timeline service address: " + this.getTimelineServiceAddress()));
        }
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        if (this.timelineServiceV2) {
            this.entityDispatcher.start();
        } else {
            this.timelineWriter = this.createTimelineWriter(this.configuration, this.authUgi, this.client, TimelineClientImpl.constructResURI(this.getConfig(), this.timelineServiceAddress, false));
        }
    }

    protected TimelineWriter createTimelineWriter(Configuration conf, UserGroupInformation ugi, Client webClient, URI uri) throws IOException {
        if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
            return new FileSystemTimelineWriter(conf, ugi, webClient, uri);
        }
        return new DirectTimelineWriter(ugi, webClient, uri);
    }

    protected void serviceStop() throws Exception {
        if (this.timelineWriter != null) {
            this.timelineWriter.close();
        }
        if (this.timelineServiceV2) {
            this.entityDispatcher.stop();
        }
        if (this.sslFactory != null) {
            this.sslFactory.destroy();
        }
        super.serviceStop();
    }

    @Override
    public void flush() throws IOException {
        if (this.timelineWriter != null) {
            this.timelineWriter.flush();
        }
    }

    @Override
    public TimelinePutResponse putEntities(TimelineEntity ... entities) throws IOException, YarnException {
        return this.timelineWriter.putEntities(entities);
    }

    @Override
    public void putEntities(org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity ... entities) throws IOException, YarnException {
        if (!this.timelineServiceV2) {
            throw new YarnException("v.2 method is invoked on a v.1.x client");
        }
        this.entityDispatcher.dispatchEntities(true, entities);
    }

    @Override
    public void putEntitiesAsync(org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity ... entities) throws IOException, YarnException {
        if (!this.timelineServiceV2) {
            throw new YarnException("v.2 method is invoked on a v.1.x client");
        }
        this.entityDispatcher.dispatchEntities(false, entities);
    }

    @Override
    public void putDomain(TimelineDomain domain) throws IOException, YarnException {
        this.timelineWriter.putDomain(domain);
    }

    @InterfaceAudience.Private
    protected void putObjects(String path, MultivaluedMap<String, String> params, Object obj) throws IOException, YarnException {
        int retries = this.verifyRestEndPointAvailable();
        boolean needRetry = true;
        while (needRetry) {
            try {
                URI uri = TimelineClientImpl.constructResURI(this.getConfig(), this.timelineServiceAddress, true);
                this.putObjects(uri, path, params, obj);
                needRetry = false;
            }
            catch (IOException e) {
                this.checkRetryWithSleep(retries, e);
                --retries;
            }
        }
    }

    private int verifyRestEndPointAvailable() throws YarnException {
        int retries = this.pollTimelineServiceAddress(this.maxServiceRetries);
        if (this.timelineServiceAddress == null) {
            String errMessage = "TimelineClient has reached to max retry times : " + this.maxServiceRetries + ", but failed to fetch timeline service address. Please verify Timeline Auxillary Service is configured in all the NMs";
            LOG.error((Object)errMessage);
            throw new YarnException(errMessage);
        }
        return retries;
    }

    private void checkRetryWithSleep(int retries, IOException e) throws YarnException, IOException {
        if (retries > 0) {
            try {
                Thread.sleep(this.serviceRetryInterval);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new YarnException("Interrupted while retrying to connect to ATS");
            }
        } else {
            StringBuilder msg = new StringBuilder("TimelineClient has reached to max retry times : ");
            msg.append(this.maxServiceRetries);
            msg.append(" for service address: ");
            msg.append(this.timelineServiceAddress);
            LOG.error((Object)msg.toString());
            throw new IOException(msg.toString(), e);
        }
    }

    protected void putObjects(URI base, String path, MultivaluedMap<String, String> params, Object obj) throws IOException, YarnException {
        ClientResponse resp;
        try {
            resp = (ClientResponse)((WebResource.Builder)this.client.resource(base).path(path).queryParams(params).accept(new String[]{"application/json"}).type("application/json")).put(ClientResponse.class, obj);
        }
        catch (RuntimeException re) {
            String msg = "Failed to get the response from the timeline server.";
            LOG.error((Object)msg, (Throwable)re);
            throw new IOException(re);
        }
        if (resp == null || resp.getStatusInfo().getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
            String msg = "Response from the timeline server is " + (resp == null ? "null" : "not successful, HTTP error code: " + resp.getStatus() + ", Server response:\n" + (String)resp.getEntity(String.class));
            LOG.error((Object)msg);
            throw new YarnException(msg);
        }
    }

    @Override
    public void setTimelineServiceAddress(String address) {
        this.timelineServiceAddress = address;
    }

    private String getTimelineServiceAddress() {
        return this.timelineServiceAddress;
    }

    @Override
    public Token<TimelineDelegationTokenIdentifier> getDelegationToken(final String renewer) throws IOException, YarnException {
        PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction = new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>(){

            @Override
            public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
                DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator);
                return authUrl.getDelegationToken(TimelineClientImpl.constructResURI(TimelineClientImpl.this.getConfig(), TimelineClientImpl.this.getTimelineServiceAddress(), false).toURL(), TimelineClientImpl.this.token, renewer, TimelineClientImpl.this.doAsUser);
            }
        };
        return (Token)this.operateDelegationToken(getDTAction);
    }

    @Override
    public long renewDelegationToken(final Token<TimelineDelegationTokenIdentifier> timelineDT) throws IOException, YarnException {
        final boolean isTokenServiceAddrEmpty = timelineDT.getService().toString().isEmpty();
        final String scheme = isTokenServiceAddrEmpty ? null : (YarnConfiguration.useHttps((Configuration)this.getConfig()) ? "https" : "http");
        final InetSocketAddress address = isTokenServiceAddrEmpty ? null : SecurityUtil.getTokenServiceAddr(timelineDT);
        PrivilegedExceptionAction<Long> renewDTAction = new PrivilegedExceptionAction<Long>(){

            @Override
            public Long run() throws Exception {
                if (!timelineDT.equals((Object)TimelineClientImpl.this.token.getDelegationToken())) {
                    TimelineClientImpl.this.token.setDelegationToken(timelineDT);
                }
                DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator);
                URI serviceURI = isTokenServiceAddrEmpty ? TimelineClientImpl.constructResURI(TimelineClientImpl.this.getConfig(), TimelineClientImpl.this.getTimelineServiceAddress(), false) : new URI(scheme, null, address.getHostName(), address.getPort(), TimelineClientImpl.RESOURCE_URI_STR_V1, null, null);
                return authUrl.renewDelegationToken(serviceURI.toURL(), TimelineClientImpl.this.token, TimelineClientImpl.this.doAsUser);
            }
        };
        return (Long)this.operateDelegationToken(renewDTAction);
    }

    @Override
    public void cancelDelegationToken(final Token<TimelineDelegationTokenIdentifier> timelineDT) throws IOException, YarnException {
        final boolean isTokenServiceAddrEmpty = timelineDT.getService().toString().isEmpty();
        final String scheme = isTokenServiceAddrEmpty ? null : (YarnConfiguration.useHttps((Configuration)this.getConfig()) ? "https" : "http");
        final InetSocketAddress address = isTokenServiceAddrEmpty ? null : SecurityUtil.getTokenServiceAddr(timelineDT);
        PrivilegedExceptionAction<Void> cancelDTAction = new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                if (!timelineDT.equals((Object)TimelineClientImpl.this.token.getDelegationToken())) {
                    TimelineClientImpl.this.token.setDelegationToken(timelineDT);
                }
                DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator);
                URI serviceURI = isTokenServiceAddrEmpty ? TimelineClientImpl.constructResURI(TimelineClientImpl.this.getConfig(), TimelineClientImpl.this.getTimelineServiceAddress(), false) : new URI(scheme, null, address.getHostName(), address.getPort(), TimelineClientImpl.RESOURCE_URI_STR_V1, null, null);
                authUrl.cancelDelegationToken(serviceURI.toURL(), TimelineClientImpl.this.token, TimelineClientImpl.this.doAsUser);
                return null;
            }
        };
        this.operateDelegationToken(cancelDTAction);
    }

    public String toString() {
        return super.toString() + " with timeline server " + TimelineClientImpl.constructResURI(this.getConfig(), this.getTimelineServiceAddress(), false) + " and writer " + this.timelineWriter;
    }

    private Object operateDelegationToken(PrivilegedExceptionAction<?> action) throws IOException, YarnException {
        TimelineClientRetryOp tokenRetryOp = this.createTimelineClientRetryOpForOperateDelegationToken(action);
        return this.connectionRetry.retryOn(tokenRetryOp);
    }

    private int pollTimelineServiceAddress(int retries) throws YarnException {
        while (this.timelineServiceAddress == null && retries > 0) {
            try {
                Thread.sleep(this.serviceRetryInterval);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new YarnException("Interrupted while trying to connect ATS");
            }
            --retries;
        }
        return retries;
    }

    private ConnectionConfigurator initConnConfigurator(Configuration conf) {
        try {
            return this.initSslConnConfigurator(60000, conf);
        }
        catch (Exception e) {
            LOG.debug((Object)"Cannot load customized ssl related configuration. Fallback to system-generic settings.", (Throwable)e);
            return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
        }
    }

    private ConnectionConfigurator initSslConnConfigurator(final int timeout, Configuration conf) throws IOException, GeneralSecurityException {
        this.sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
        this.sslFactory.init();
        final SSLSocketFactory sf = this.sslFactory.createSSLSocketFactory();
        final HostnameVerifier hv = this.sslFactory.getHostnameVerifier();
        return new ConnectionConfigurator(){

            public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
                if (conn instanceof HttpsURLConnection) {
                    HttpsURLConnection c = (HttpsURLConnection)conn;
                    c.setSSLSocketFactory(sf);
                    c.setHostnameVerifier(hv);
                }
                TimelineClientImpl.setTimeouts(conn, timeout);
                return conn;
            }
        };
    }

    private static void setTimeouts(URLConnection connection, int socketTimeout) {
        connection.setConnectTimeout(socketTimeout);
        connection.setReadTimeout(socketTimeout);
    }

    private static URI constructResURI(Configuration conf, String address, boolean v2) {
        return URI.create(JOINER.join((Object)(YarnConfiguration.useHttps((Configuration)conf) ? "https://" : "http://"), (Object)address, new Object[]{v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1}));
    }

    public static void main(String[] argv) throws Exception {
        String path;
        CommandLine cliParser = new GnuParser().parse(opts, argv);
        if (cliParser.hasOption("put") && (path = cliParser.getOptionValue("put")) != null && path.length() > 0) {
            if (cliParser.hasOption(ENTITY_DATA_TYPE)) {
                TimelineClientImpl.putTimelineDataInJSONFile(path, ENTITY_DATA_TYPE);
                return;
            }
            if (cliParser.hasOption(DOMAIN_DATA_TYPE)) {
                TimelineClientImpl.putTimelineDataInJSONFile(path, DOMAIN_DATA_TYPE);
                return;
            }
        }
        TimelineClientImpl.printUsage();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void putTimelineDataInJSONFile(String path, String type) {
        File jsonFile = new File(path);
        if (!jsonFile.exists()) {
            LOG.error((Object)("File [" + jsonFile.getAbsolutePath() + "] doesn't exist"));
            return;
        }
        ObjectMapper mapper = new ObjectMapper();
        YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
        org.apache.hadoop.yarn.api.records.timeline.TimelineEntities entities = null;
        TimelineDomains domains = null;
        try {
            if (type.equals(ENTITY_DATA_TYPE)) {
                entities = (org.apache.hadoop.yarn.api.records.timeline.TimelineEntities)mapper.readValue(jsonFile, org.apache.hadoop.yarn.api.records.timeline.TimelineEntities.class);
            } else if (type.equals(DOMAIN_DATA_TYPE)) {
                domains = (TimelineDomains)mapper.readValue(jsonFile, TimelineDomains.class);
            }
        }
        catch (Exception e) {
            LOG.error((Object)("Error when reading  " + e.getMessage()));
            e.printStackTrace(System.err);
            return;
        }
        YarnConfiguration conf = new YarnConfiguration();
        TimelineClient client = TimelineClient.createTimelineClient();
        client.init((Configuration)conf);
        client.start();
        try {
            if (UserGroupInformation.isSecurityEnabled() && conf.getBoolean("yarn.timeline-service.enabled", false)) {
                Token<TimelineDelegationTokenIdentifier> token = client.getDelegationToken(UserGroupInformation.getCurrentUser().getUserName());
                UserGroupInformation.getCurrentUser().addToken(token);
            }
            if (type.equals(ENTITY_DATA_TYPE)) {
                TimelinePutResponse response = client.putEntities(entities.getEntities().toArray(new TimelineEntity[entities.getEntities().size()]));
                if (response.getErrors().size() == 0) {
                    LOG.info((Object)"Timeline entities are successfully put");
                } else {
                    for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
                        LOG.error((Object)("TimelineEntity [" + error.getEntityType() + ":" + error.getEntityId() + "] is not successfully put. Error code: " + error.getErrorCode()));
                    }
                }
            } else if (type.equals(DOMAIN_DATA_TYPE)) {
                boolean hasError = false;
                for (TimelineDomain domain : domains.getDomains()) {
                    try {
                        client.putDomain(domain);
                    }
                    catch (Exception e) {
                        LOG.error((Object)("Error when putting domain " + domain.getId()), (Throwable)e);
                        hasError = true;
                    }
                }
                if (!hasError) {
                    LOG.info((Object)"Timeline domains are successfully put");
                }
            }
        }
        catch (RuntimeException e) {
            LOG.error((Object)"Error when putting the timeline data", (Throwable)e);
        }
        catch (Exception e) {
            LOG.error((Object)"Error when putting the timeline data", (Throwable)e);
        }
        finally {
            client.stop();
        }
    }

    private static void printUsage() {
        new HelpFormatter().printHelp("TimelineClient", opts);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public UserGroupInformation getUgi() {
        return this.authUgi;
    }

    @Override
    public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, TimelineEntity ... entities) throws IOException, YarnException {
        if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
            throw new YarnException("This API is not supported under current Timeline Service Version: " + this.timelineServiceVersion);
        }
        return this.timelineWriter.putEntities(appAttemptId, groupId, entities);
    }

    @Override
    public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain) throws IOException, YarnException {
        if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
            throw new YarnException("This API is not supported under current Timeline Service Version: " + this.timelineServiceVersion);
        }
        this.timelineWriter.putDomain(appAttemptId, domain);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public void setTimelineWriter(TimelineWriter writer) {
        this.timelineWriter = writer;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public TimelineClientRetryOp createTimelineClientRetryOpForOperateDelegationToken(PrivilegedExceptionAction<?> action) throws IOException {
        return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi, action);
    }

    static {
        opts.addOption("put", true, "Put the timeline entities/domain in a JSON file");
        opts.getOption("put").setArgName("Path to the JSON file");
        opts.addOption(ENTITY_DATA_TYPE, false, "Specify the JSON file contains the entities");
        opts.addOption(DOMAIN_DATA_TYPE, false, "Specify the JSON file contains the domain");
        opts.addOption("help", false, "Print usage");
        DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator(){

            public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
                TimelineClientImpl.setTimeouts(conn, 60000);
                return conn;
            }
        };
    }

    private class TimelineEntityDispatcher {
        private static final long DRAIN_TIME_PERIOD = 2000L;
        private int numberOfAsyncsToMerge;
        private final BlockingQueue<EntitiesHolder> timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
        private ExecutorService executor;

        TimelineEntityDispatcher(Configuration conf) {
            this.numberOfAsyncsToMerge = conf.getInt("yarn.timeline-service.timeline-client.number-of-async-entities-to-merge", 10);
        }

        Runnable createRunnable() {
            return new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        while (!Thread.currentThread().isInterrupted()) {
                            EntitiesHolder entitiesHolder;
                            try {
                                entitiesHolder = (EntitiesHolder)TimelineEntityDispatcher.this.timelineEntityQueue.take();
                            }
                            catch (InterruptedException ie) {
                                LOG.info((Object)"Timeline dispatcher thread was interrupted ");
                                Thread.currentThread().interrupt();
                                if (!TimelineEntityDispatcher.this.timelineEntityQueue.isEmpty()) {
                                    LOG.info((Object)("Yet to publish " + TimelineEntityDispatcher.this.timelineEntityQueue.size() + " timelineEntities, draining them now. "));
                                }
                                long timeTillweDrain = System.currentTimeMillis() + 2000L;
                                while (!TimelineEntityDispatcher.this.timelineEntityQueue.isEmpty()) {
                                    this.publishWithoutBlockingOnQueue((EntitiesHolder)TimelineEntityDispatcher.this.timelineEntityQueue.poll());
                                    if (System.currentTimeMillis() <= timeTillweDrain) continue;
                                    if (TimelineEntityDispatcher.this.timelineEntityQueue.isEmpty()) break;
                                    LOG.warn((Object)("Time to drain elapsed! Remaining " + TimelineEntityDispatcher.this.timelineEntityQueue.size() + "timelineEntities will not be published"));
                                    EntitiesHolder nextEntityInTheQueue = null;
                                    while ((nextEntityInTheQueue = (EntitiesHolder)TimelineEntityDispatcher.this.timelineEntityQueue.poll()) != null) {
                                        nextEntityInTheQueue.cancel(true);
                                    }
                                    break block7;
                                }
                                return;
                            }
                            if (entitiesHolder == null) continue;
                            this.publishWithoutBlockingOnQueue(entitiesHolder);
                        }
                    }
                    finally {
                        if (!TimelineEntityDispatcher.this.timelineEntityQueue.isEmpty()) {
                            LOG.info((Object)("Yet to publish " + TimelineEntityDispatcher.this.timelineEntityQueue.size() + " timelineEntities, draining them now. "));
                        }
                        long timeTillweDrain = System.currentTimeMillis() + 2000L;
                        while (!TimelineEntityDispatcher.this.timelineEntityQueue.isEmpty()) {
                            this.publishWithoutBlockingOnQueue((EntitiesHolder)TimelineEntityDispatcher.this.timelineEntityQueue.poll());
                            if (System.currentTimeMillis() <= timeTillweDrain) continue;
                            if (TimelineEntityDispatcher.this.timelineEntityQueue.isEmpty()) break;
                            LOG.warn((Object)("Time to drain elapsed! Remaining " + TimelineEntityDispatcher.this.timelineEntityQueue.size() + "timelineEntities will not be published"));
                            EntitiesHolder nextEntityInTheQueue = null;
                            while ((nextEntityInTheQueue = (EntitiesHolder)TimelineEntityDispatcher.this.timelineEntityQueue.poll()) != null) {
                                nextEntityInTheQueue.cancel(true);
                            }
                            break block9;
                        }
                    }
                }

                private void publishWithoutBlockingOnQueue(EntitiesHolder entitiesHolder) {
                    block4: {
                        if (entitiesHolder.isSync()) {
                            entitiesHolder.run();
                            return;
                        }
                        int count = 1;
                        do {
                            EntitiesHolder nextEntityInTheQueue;
                            if ((nextEntityInTheQueue = (EntitiesHolder)TimelineEntityDispatcher.this.timelineEntityQueue.poll()) == null) {
                                entitiesHolder.run();
                                break block4;
                            }
                            if (nextEntityInTheQueue.isSync()) {
                                entitiesHolder.run();
                                nextEntityInTheQueue.run();
                                break block4;
                            }
                            entitiesHolder.getEntities().addEntities(nextEntityInTheQueue.getEntities().getEntities());
                        } while (++count != TimelineEntityDispatcher.this.numberOfAsyncsToMerge);
                        entitiesHolder.run();
                    }
                }
            };
        }

        public void dispatchEntities(boolean sync, org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished) throws YarnException {
            if (this.executor.isShutdown()) {
                throw new YarnException("Timeline client is in the process of stopping, not accepting any more TimelineEntities");
            }
            TimelineEntities entities = new TimelineEntities();
            for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) {
                entities.addEntity(entity);
            }
            EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
            try {
                this.timelineEntityQueue.put(entitiesHolder);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new YarnException("Failed while adding entity to the queue for publishing", (Throwable)e);
            }
            if (sync) {
                try {
                    entitiesHolder.get();
                }
                catch (ExecutionException e) {
                    throw new YarnException("Failed while publishing entity", e.getCause());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new YarnException("Interrupted while publishing entity", (Throwable)e);
                }
            }
        }

        public void start() {
            this.executor = Executors.newSingleThreadExecutor();
            this.executor.execute(this.createRunnable());
        }

        public void stop() {
            LOG.info((Object)"Stopping TimelineClient.");
            this.executor.shutdownNow();
            try {
                this.executor.awaitTermination(2000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
        }
    }

    private final class EntitiesHolder
    extends FutureTask<Void> {
        private final TimelineEntities entities;
        private final boolean isSync;

        EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
            super(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    MultivaluedMapImpl params = new MultivaluedMapImpl();
                    params.add((Object)"appid", (Object)TimelineClientImpl.this.getContextAppId().toString());
                    params.add((Object)"async", (Object)Boolean.toString(!isSync));
                    TimelineClientImpl.this.putObjects("entities", (MultivaluedMap<String, String>)params, entities);
                    return null;
                }
            });
            this.entities = entities;
            this.isSync = isSync;
        }

        public boolean isSync() {
            return this.isSync;
        }

        public TimelineEntities getEntities() {
            return this.entities;
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public class TimelineClientRetryOpForOperateDelegationToken
    extends TimelineClientRetryOp {
        private final UserGroupInformation authUgi;
        private final PrivilegedExceptionAction<?> action;

        public TimelineClientRetryOpForOperateDelegationToken(UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
            this.authUgi = authUgi;
            this.action = action;
        }

        @Override
        public Object run() throws IOException {
            this.authUgi.checkTGTAndReloginFromKeytab();
            try {
                return this.authUgi.doAs(this.action);
            }
            catch (UndeclaredThrowableException e) {
                throw new IOException(e.getCause());
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }

        @Override
        public boolean shouldRetryOn(Exception e) {
            return e instanceof ConnectException || e instanceof SocketTimeoutException;
        }
    }

    private class TimelineURLConnectionFactory
    implements HttpURLConnectionFactory {
        private TimelineURLConnectionFactory() {
        }

        public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
            TimelineClientImpl.this.authUgi.checkTGTAndReloginFromKeytab();
            try {
                return new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator).openConnection(url, TimelineClientImpl.this.token, TimelineClientImpl.this.doAsUser);
            }
            catch (UndeclaredThrowableException e) {
                throw new IOException(e.getCause());
            }
            catch (AuthenticationException ae) {
                throw new IOException(ae);
            }
        }
    }

    private class TimelineJerseyRetryFilter
    extends ClientFilter {
        private TimelineJerseyRetryFilter() {
        }

        public ClientResponse handle(final ClientRequest cr) throws ClientHandlerException {
            TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp(){

                @Override
                public Object run() {
                    return TimelineJerseyRetryFilter.this.getNext().handle(cr);
                }

                @Override
                public boolean shouldRetryOn(Exception e) {
                    return e instanceof ClientHandlerException && e.getCause() instanceof ConnectException;
                }
            };
            try {
                return (ClientResponse)TimelineClientImpl.this.connectionRetry.retryOn(jerseyRetryOp);
            }
            catch (IOException e) {
                throw new ClientHandlerException("Jersey retry failed!\nMessage: " + e.getMessage());
            }
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    static class TimelineClientConnectionRetry {
        @InterfaceAudience.Private
        @VisibleForTesting
        public int maxRetries;
        @InterfaceAudience.Private
        @VisibleForTesting
        public long retryInterval;
        private boolean retried = false;

        @InterfaceAudience.Private
        @VisibleForTesting
        boolean getRetired() {
            return this.retried;
        }

        public TimelineClientConnectionRetry(Configuration conf) {
            Preconditions.checkArgument((conf.getInt("yarn.timeline-service.client.max-retries", 30) >= -1 ? 1 : 0) != 0, (String)"%s property value should be greater than or equal to -1", (Object[])new Object[]{"yarn.timeline-service.client.max-retries"});
            Preconditions.checkArgument((conf.getLong("yarn.timeline-service.client.retry-interval-ms", 1000L) > 0L ? 1 : 0) != 0, (String)"%s property value should be greater than zero", (Object[])new Object[]{"yarn.timeline-service.client.retry-interval-ms"});
            this.maxRetries = conf.getInt("yarn.timeline-service.client.max-retries", 30);
            this.retryInterval = conf.getLong("yarn.timeline-service.client.retry-interval-ms", 1000L);
        }

        public Object retryOn(TimelineClientRetryOp op) throws RuntimeException, IOException {
            int leftRetries = this.maxRetries;
            this.retried = false;
            while (true) {
                try {
                    return op.run();
                }
                catch (IOException | RuntimeException e) {
                    if (leftRetries != 0) {
                        if (!op.shouldRetryOn(e)) {
                            throw e;
                        }
                        this.logException(e, leftRetries);
                        if (leftRetries > 0) {
                            --leftRetries;
                        }
                        this.retried = true;
                        try {
                            Thread.sleep(this.retryInterval);
                        }
                        catch (InterruptedException ie) {
                            LOG.warn((Object)"Client retry sleep interrupted! ");
                        }
                        continue;
                    }
                    throw new RuntimeException("Failed to connect to timeline server. Connection retries limit exceeded. The posted timeline event may be missing");
                }
                break;
            }
        }

        private void logException(Exception e, int leftRetries) {
            if (leftRetries > 0) {
                LOG.info((Object)("Exception caught by TimelineClientConnectionRetry, will try " + leftRetries + " more time(s).\nMessage: " + e.getMessage()));
            } else {
                LOG.info((Object)("ConnectionException caught by TimelineClientConnectionRetry, will keep retrying.\nMessage: " + e.getMessage()));
            }
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public static abstract class TimelineClientRetryOp {
        public abstract Object run() throws IOException;

        public abstract boolean shouldRetryOn(Exception var1);
    }
}

