/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StickyTaskAssignor<ID>
implements TaskAssignor<ID, TaskId> {
    private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);
    private final Map<ID, ClientState> clients;
    private final Set<TaskId> taskIds;
    private final Map<TaskId, ID> previousActiveTaskAssignment = new HashMap<TaskId, ID>();
    private final Map<TaskId, Set<ID>> previousStandbyTaskAssignment = new HashMap<TaskId, Set<ID>>();
    private final TaskPairs taskPairs;

    public StickyTaskAssignor(Map<ID, ClientState> clients, Set<TaskId> taskIds) {
        this.clients = clients;
        this.taskIds = taskIds;
        this.taskPairs = new TaskPairs(taskIds.size() * (taskIds.size() - 1) / 2);
        this.mapPreviousTaskAssignment(clients);
    }

    @Override
    public void assign(int numStandbyReplicas) {
        this.assignActive();
        this.assignStandby(numStandbyReplicas);
    }

    private void assignStandby(int numStandbyReplicas) {
        block0: for (TaskId taskId : this.taskIds) {
            for (int i = 0; i < numStandbyReplicas; ++i) {
                Set<ID> ids = this.findClientsWithoutAssignedTask(taskId);
                if (ids.isEmpty()) {
                    log.warn("Unable to assign {} of {} standby tasks for task [{}]. There is not enough available capacity. You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", new Object[]{numStandbyReplicas - i, numStandbyReplicas, taskId});
                    continue block0;
                }
                this.allocateTaskWithClientCandidates(taskId, ids, false);
            }
        }
    }

    private void assignActive() {
        TaskId taskId2;
        int totalCapacity = this.sumCapacity(this.clients.values());
        int tasksPerThread = this.taskIds.size() / totalCapacity;
        HashSet<TaskId> assigned = new HashSet<TaskId>();
        for (Map.Entry<TaskId, ID> entry : this.previousActiveTaskAssignment.entrySet()) {
            ClientState client;
            taskId2 = entry.getKey();
            if (!this.taskIds.contains(taskId2) || !(client = this.clients.get(entry.getValue())).hasUnfulfilledQuota(tasksPerThread)) continue;
            this.assignTaskToClient(assigned, taskId2, client);
        }
        HashSet<TaskId> unassigned = new HashSet<TaskId>(this.taskIds);
        unassigned.removeAll(assigned);
        Iterator iterator = unassigned.iterator();
        block1: while (iterator.hasNext()) {
            taskId2 = (TaskId)iterator.next();
            Set<ID> clientIds = this.previousStandbyTaskAssignment.get(taskId2);
            if (clientIds == null) continue;
            for (ID clientId : clientIds) {
                ClientState client = this.clients.get(clientId);
                if (!client.hasUnfulfilledQuota(tasksPerThread)) continue;
                this.assignTaskToClient(assigned, taskId2, client);
                iterator.remove();
                continue block1;
            }
        }
        for (TaskId taskId2 : unassigned) {
            this.allocateTaskWithClientCandidates(taskId2, this.clients.keySet(), true);
        }
    }

    private void allocateTaskWithClientCandidates(TaskId taskId, Set<ID> clientsWithin, boolean active) {
        ClientState client = this.findClient(taskId, clientsWithin);
        this.taskPairs.addPairs(taskId, client.assignedTasks());
        client.assign(taskId, active);
    }

    private void assignTaskToClient(Set<TaskId> assigned, TaskId taskId, ClientState client) {
        this.taskPairs.addPairs(taskId, client.assignedTasks());
        client.assign(taskId, true);
        assigned.add(taskId);
    }

    private Set<ID> findClientsWithoutAssignedTask(TaskId taskId) {
        HashSet<ID> clientIds = new HashSet<ID>();
        for (Map.Entry<ID, ClientState> client : this.clients.entrySet()) {
            if (client.getValue().hasAssignedTask(taskId)) continue;
            clientIds.add(client.getKey());
        }
        return clientIds;
    }

    private ClientState findClient(TaskId taskId, Set<ID> clientsWithin) {
        if (clientsWithin.size() == 1) {
            return this.clients.get(clientsWithin.iterator().next());
        }
        ClientState previous = this.findClientsWithPreviousAssignedTask(taskId, clientsWithin);
        if (previous == null) {
            return this.leastLoaded(taskId, clientsWithin);
        }
        if (this.shouldBalanceLoad(previous)) {
            ClientState standby = this.findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
            if (standby == null || this.shouldBalanceLoad(standby)) {
                return this.leastLoaded(taskId, clientsWithin);
            }
            return standby;
        }
        return previous;
    }

    private boolean shouldBalanceLoad(ClientState client) {
        return client.reachedCapacity() && this.hasClientsWithMoreAvailableCapacity(client);
    }

    private boolean hasClientsWithMoreAvailableCapacity(ClientState client) {
        for (ClientState clientState : this.clients.values()) {
            if (!clientState.hasMoreAvailableCapacityThan(client)) continue;
            return true;
        }
        return false;
    }

    private ClientState findClientsWithPreviousAssignedTask(TaskId taskId, Set<ID> clientsWithin) {
        ID previous = this.previousActiveTaskAssignment.get(taskId);
        if (previous != null && clientsWithin.contains(previous)) {
            return this.clients.get(previous);
        }
        return this.findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
    }

    private ClientState findLeastLoadedClientWithPreviousStandByTask(TaskId taskId, Set<ID> clientsWithin) {
        Set<ID> ids = this.previousStandbyTaskAssignment.get(taskId);
        if (ids == null) {
            return null;
        }
        HashSet<ID> constrainTo = new HashSet<ID>(ids);
        constrainTo.retainAll(clientsWithin);
        return this.leastLoaded(taskId, constrainTo);
    }

    private ClientState leastLoaded(TaskId taskId, Set<ID> clientIds) {
        ClientState leastLoaded = this.findLeastLoaded(taskId, clientIds, true);
        if (leastLoaded == null) {
            return this.findLeastLoaded(taskId, clientIds, false);
        }
        return leastLoaded;
    }

    private ClientState findLeastLoaded(TaskId taskId, Set<ID> clientIds, boolean checkTaskPairs) {
        ClientState leastLoaded = null;
        for (ID id : clientIds) {
            ClientState client = this.clients.get(id);
            if (client.assignedTaskCount() == 0) {
                return client;
            }
            if (leastLoaded != null && !client.hasMoreAvailableCapacityThan(leastLoaded)) continue;
            if (!checkTaskPairs) {
                leastLoaded = client;
                continue;
            }
            if (!this.taskPairs.hasNewPair(taskId, client.assignedTasks())) continue;
            leastLoaded = client;
        }
        return leastLoaded;
    }

    private void mapPreviousTaskAssignment(Map<ID, ClientState> clients) {
        for (Map.Entry<ID, ClientState> clientState : clients.entrySet()) {
            for (TaskId activeTask : clientState.getValue().previousActiveTasks()) {
                this.previousActiveTaskAssignment.put(activeTask, clientState.getKey());
            }
            for (TaskId prevAssignedTask : clientState.getValue().previousStandbyTasks()) {
                if (!this.previousStandbyTaskAssignment.containsKey(prevAssignedTask)) {
                    this.previousStandbyTaskAssignment.put(prevAssignedTask, new HashSet());
                }
                this.previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey());
            }
        }
    }

    private int sumCapacity(Collection<ClientState> values) {
        int capacity = 0;
        for (ClientState client : values) {
            capacity += client.capacity();
        }
        return capacity;
    }

    private static class TaskPairs {
        private final Set<Pair> pairs;
        private final int maxPairs;

        TaskPairs(int maxPairs) {
            this.maxPairs = maxPairs;
            this.pairs = new HashSet<Pair>(maxPairs);
        }

        boolean hasNewPair(TaskId task1, Set<TaskId> taskIds) {
            if (this.pairs.size() == this.maxPairs) {
                return false;
            }
            for (TaskId taskId : taskIds) {
                if (this.pairs.contains(this.pair(task1, taskId))) continue;
                return true;
            }
            return false;
        }

        void addPairs(TaskId taskId, Set<TaskId> assigned) {
            for (TaskId id : assigned) {
                this.pairs.add(this.pair(id, taskId));
            }
        }

        Pair pair(TaskId task1, TaskId task2) {
            if (task1.compareTo(task2) < 0) {
                return new Pair(task1, task2);
            }
            return new Pair(task2, task1);
        }

        private static class Pair {
            private final TaskId task1;
            private final TaskId task2;

            Pair(TaskId task1, TaskId task2) {
                this.task1 = task1;
                this.task2 = task2;
            }

            public boolean equals(Object o) {
                if (this == o) {
                    return true;
                }
                if (o == null || this.getClass() != o.getClass()) {
                    return false;
                }
                Pair pair = (Pair)o;
                return Objects.equals(this.task1, pair.task1) && Objects.equals(this.task2, pair.task2);
            }

            public int hashCode() {
                return Objects.hash(this.task1, this.task2);
            }
        }
    }
}

