package org.apache.doris.flink.lookup;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/lookup/ExecutionPool.class */
public class ExecutionPool implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExecutionPool.class);
    private ActionWatcher readActionWatcher;
    final ArrayBlockingQueue<Get> queue;
    ExecutorService actionWatcherExecutorService;
    ExecutorService workerExecutorService;
    private Worker[] workers;
    private Semaphore semaphore;
    private final int jdbcReadThreadSize;
    private AtomicBoolean started = new AtomicBoolean(false);
    private AtomicBoolean workerStated = new AtomicBoolean(false);
    ThreadFactory workerThreadFactory = new DefaultThreadFactory("worker");
    ThreadFactory actionWatcherThreadFactory = new DefaultThreadFactory("action-watcher");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/doris/flink/lookup/ExecutionPool$ActionWatcher.class */
    public class ActionWatcher implements Runnable {
        private int batchSize;

        public ActionWatcher(int i) {
            this.batchSize = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            ExecutionPool.LOG.info("action watcher start");
            ArrayList<Get> arrayList = new ArrayList(this.batchSize);
            while (ExecutionPool.this.started.get()) {
                try {
                    arrayList.clear();
                    Get poll = ExecutionPool.this.queue.poll(2L, TimeUnit.SECONDS);
                    if (poll != null) {
                        arrayList.add(poll);
                        ExecutionPool.this.queue.drainTo(arrayList, this.batchSize - 1);
                        HashMap hashMap = new HashMap();
                        for (Get get : arrayList) {
                            ((List) hashMap.computeIfAbsent(get.getRecord().getTableIdentifier(), str -> {
                                return new ArrayList();
                            })).add(get);
                        }
                        Iterator it = hashMap.entrySet().iterator();
                        while (it.hasNext()) {
                            do {
                            } while (!ExecutionPool.this.submit(new GetAction((List) ((Map.Entry) it.next()).getValue())));
                        }
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    for (Get get2 : arrayList) {
                        if (!get2.getFuture().isDone()) {
                            get2.getFuture().completeExceptionally(e2);
                        }
                    }
                }
            }
        }

        public String toString() {
            return "ActionWatcher{batchSize=" + this.batchSize + '}';
        }
    }

    /* loaded from: input_file:org/apache/doris/flink/lookup/ExecutionPool$DefaultThreadFactory.class */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String str) {
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + str + "-";
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, this.namePrefix + this.threadNumber.getAndIncrement());
            thread.setDaemon(false);
            return thread;
        }
    }

    public ExecutionPool(DorisOptions dorisOptions, DorisLookupOptions dorisLookupOptions) {
        this.queue = new ArrayBlockingQueue<>(dorisLookupOptions.getJdbcReadBatchQueueSize());
        this.readActionWatcher = new ActionWatcher(dorisLookupOptions.getJdbcReadBatchSize());
        this.workers = new Worker[dorisLookupOptions.getJdbcReadThreadSize()];
        for (int i = 0; i < this.workers.length; i++) {
            this.workers[i] = new Worker(this.workerStated, dorisOptions, dorisLookupOptions, i);
        }
        this.jdbcReadThreadSize = dorisLookupOptions.getJdbcReadThreadSize();
        start();
    }

    private void start() {
        if (this.started.compareAndSet(false, true)) {
            this.workerStated.set(true);
            this.workerExecutorService = new ThreadPoolExecutor(this.workers.length, this.workers.length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1), this.workerThreadFactory, new ThreadPoolExecutor.AbortPolicy());
            this.actionWatcherExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1), this.actionWatcherThreadFactory, new ThreadPoolExecutor.AbortPolicy());
            for (int i = 0; i < this.workers.length; i++) {
                this.workerExecutorService.execute(this.workers[i]);
            }
            this.actionWatcherExecutorService.execute(this.readActionWatcher);
            this.semaphore = new Semaphore(this.jdbcReadThreadSize);
        }
    }

    public CompletableFuture<List<Record>> get(Get get) {
        appendGet(get);
        return get.getFuture();
    }

    private void appendGet(Get get) {
        get.setFuture(new CompletableFuture<>());
        try {
            if (!this.queue.offer(get, 10000L, TimeUnit.MILLISECONDS)) {
                get.getFuture().completeExceptionally(new TimeoutException());
            }
        } catch (InterruptedException e) {
            get.getFuture().completeExceptionally(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.started.compareAndSet(true, false)) {
            this.actionWatcherExecutorService.shutdown();
            this.workerStated.set(false);
            this.workerExecutorService.shutdown();
            this.semaphore = null;
        }
    }

    public boolean submit(GetAction getAction) {
        if (this.semaphore != null) {
            try {
                if (!this.semaphore.tryAcquire(2000L, TimeUnit.MILLISECONDS)) {
                    return false;
                }
                getAction.setSemaphore(this.semaphore);
            } catch (InterruptedException e) {
                throw new RuntimeException("get semaphore be interrupt");
            }
        }
        for (int i = 0; i < this.workers.length; i++) {
            if (this.workers[i].offer(getAction)) {
                return true;
            }
        }
        if (this.semaphore == null) {
            return false;
        }
        this.semaphore.release();
        return false;
    }
}
