/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.pull;

import akka.actor.ActorSelection;
import akka.actor.Address;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.pull.BlockingContainerQueue;
import com.alibaba.schedulerx.worker.pull.PullManager;
import java.util.List;
import java.util.concurrent.TimeoutException;

public class PullThread
extends Thread {
    private final long jobInstanceId;
    private final int pageSize;
    private final BlockingContainerQueue queue;
    private final ActorSelection masterActorSelection;
    private volatile boolean running = true;
    private final String workerIdAddr;
    private static final Logger LOGGER = LogFactory.getLogger(PullThread.class);

    public PullThread(long jobInstanceId, int pageSize, String taskMasterAkkaPath, BlockingContainerQueue queue) {
        super("Schedulerx-PullThread-" + jobInstanceId);
        this.jobInstanceId = jobInstanceId;
        this.pageSize = pageSize;
        this.queue = queue;
        this.masterActorSelection = SchedulerxWorker.actorSystem.actorSelection(taskMasterAkkaPath);
        Address address = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
        this.workerIdAddr = address.system() + "@" + (String)address.host().get() + ":" + address.port().get();
    }

    @Override
    public void run() {
        while (this.running) {
            Worker.PullTaskFromMasterRequest request = Worker.PullTaskFromMasterRequest.newBuilder().setJobInstanceId(this.jobInstanceId).setPageSize(this.pageSize).setWorkerIdAddr(this.workerIdAddr).build();
            try {
                Worker.PullTaskFromMasterResponse response = (Worker.PullTaskFromMasterResponse)FutureUtils.awaitResult((ActorSelection)this.masterActorSelection, (Object)request, (long)30L);
                if (response.getSuccess()) {
                    List containerRequests = response.getRequestList();
                    if (containerRequests != null && !containerRequests.isEmpty()) {
                        for (Worker.MasterStartContainerRequest containerRequest : containerRequests) {
                            boolean insertSuccess = false;
                            while (this.running && !insertSuccess) {
                                insertSuccess = this.queue.put(containerRequest);
                            }
                        }
                        continue;
                    }
                    Thread.sleep(3000L);
                    continue;
                }
                LOGGER.error("pull container error, " + response.getMessage());
                PullManager.INSTANCE.stop(this.jobInstanceId);
            }
            catch (TimeoutException e) {
                LOGGER.error("pull task timeout, stop PullManager");
                PullManager.INSTANCE.crash(this.jobInstanceId);
            }
            catch (Throwable e) {
                LOGGER.error("", e);
            }
        }
    }

    public void stopRunning() {
        this.running = false;
    }
}

