/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.streams;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.coordinator.group.streams.MemberState;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs;

public class CurrentAssignmentBuilder {
    private final StreamsGroupMember member;
    private int targetAssignmentEpoch;
    private TasksTuple targetAssignment;
    private BiFunction<String, Integer, String> currentActiveTaskProcessId;
    private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds;
    private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds;
    private Optional<TasksTuple> ownedTasks = Optional.empty();

    public CurrentAssignmentBuilder(StreamsGroupMember member) {
        this.member = Objects.requireNonNull(member);
    }

    public CurrentAssignmentBuilder withTargetAssignment(int targetAssignmentEpoch, TasksTuple targetAssignment) {
        this.targetAssignmentEpoch = targetAssignmentEpoch;
        this.targetAssignment = Objects.requireNonNull(targetAssignment);
        return this;
    }

    public CurrentAssignmentBuilder withCurrentActiveTaskProcessId(BiFunction<String, Integer, String> currentActiveTaskProcessId) {
        this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId);
        return this;
    }

    public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds) {
        this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds);
        return this;
    }

    public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds) {
        this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds);
        return this;
    }

    public CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
        this.ownedTasks = Optional.ofNullable(ownedAssignment);
        return this;
    }

    public StreamsGroupMember build() {
        switch (this.member.state()) {
            case STABLE: {
                if (this.member.memberEpoch() != this.targetAssignmentEpoch) {
                    return this.computeNextAssignment(this.member.memberEpoch(), this.member.assignedTasks());
                }
                return this.member;
            }
            case UNREVOKED_TASKS: {
                if (this.ownedTasks.isEmpty() || this.ownedTasks.get().containsAny(this.member.tasksPendingRevocation())) {
                    return this.member;
                }
                return this.computeNextAssignment(this.member.memberEpoch() + 1, this.member.assignedTasks());
            }
            case UNRELEASED_TASKS: {
                return this.computeNextAssignment(this.member.memberEpoch(), this.member.assignedTasks());
            }
            case UNKNOWN: {
                if (this.ownedTasks.isEmpty() || !this.ownedTasks.get().isEmpty()) {
                    throw new FencedMemberEpochException("The streams group member is in a unknown state. The member must abandon all its tasks and rejoin.");
                }
                return this.computeNextAssignment(this.targetAssignmentEpoch, this.member.assignedTasks());
            }
        }
        return this.member;
    }

    private boolean computeAssignmentDifference(Map<String, Set<Integer>> currentAssignment, Map<String, Set<Integer>> targetAssignment, Map<String, Set<Integer>> resultAssignedTasks, Map<String, Set<Integer>> resultTasksPendingRevocation, Map<String, Set<Integer>> resultTasksPendingAssignment, BiPredicate<String, Integer> isUnreleasedTask) {
        boolean hasUnreleasedTasks = false;
        HashSet<String> allSubtopologyIds = new HashSet<String>(targetAssignment.keySet());
        allSubtopologyIds.addAll(currentAssignment.keySet());
        for (String subtopologyId : allSubtopologyIds) {
            hasUnreleasedTasks |= CurrentAssignmentBuilder.computeAssignmentDifferenceForOneSubtopology(subtopologyId, currentAssignment.getOrDefault(subtopologyId, Set.of()), targetAssignment.getOrDefault(subtopologyId, Set.of()), resultAssignedTasks, resultTasksPendingRevocation, resultTasksPendingAssignment, isUnreleasedTask);
        }
        return hasUnreleasedTasks;
    }

    private static boolean computeAssignmentDifferenceForOneSubtopology(String subtopologyId, Set<Integer> currentTasksForThisSubtopology, Set<Integer> targetTasksForThisSubtopology, Map<String, Set<Integer>> resultAssignedTasks, Map<String, Set<Integer>> resultTasksPendingRevocation, Map<String, Set<Integer>> resultTasksPendingAssignment, BiPredicate<String, Integer> isUnreleasedTask) {
        HashSet<Integer> resultAssignedTasksForThisSubtopology = new HashSet<Integer>(currentTasksForThisSubtopology);
        resultAssignedTasksForThisSubtopology.retainAll(targetTasksForThisSubtopology);
        HashSet<Integer> resultTasksPendingRevocationForThisSubtopology = new HashSet<Integer>(currentTasksForThisSubtopology);
        resultTasksPendingRevocationForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
        HashSet<Integer> resultTasksPendingAssignmentForThisSubtopology = new HashSet<Integer>(targetTasksForThisSubtopology);
        resultTasksPendingAssignmentForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
        boolean hasUnreleasedTasks = resultTasksPendingAssignmentForThisSubtopology.removeIf(taskId -> isUnreleasedTask.test(subtopologyId, (Integer)taskId));
        if (!resultAssignedTasksForThisSubtopology.isEmpty()) {
            resultAssignedTasks.put(subtopologyId, resultAssignedTasksForThisSubtopology);
        }
        if (!resultTasksPendingRevocationForThisSubtopology.isEmpty()) {
            resultTasksPendingRevocation.put(subtopologyId, resultTasksPendingRevocationForThisSubtopology);
        }
        if (!resultTasksPendingAssignmentForThisSubtopology.isEmpty()) {
            resultTasksPendingAssignment.put(subtopologyId, resultTasksPendingAssignmentForThisSubtopology);
        }
        return hasUnreleasedTasks;
    }

    private boolean computeAssignmentDifferenceWithEpoch(Map<String, Map<Integer, Integer>> currentAssignment, Map<String, Set<Integer>> targetAssignment, int targetAssignmentEpoch, Map<String, Map<Integer, Integer>> resultAssignedTasks, Map<String, Map<Integer, Integer>> resultTasksPendingRevocation, Map<String, Map<Integer, Integer>> resultTasksPendingAssignment, BiPredicate<String, Integer> isUnreleasedTask) {
        boolean hasUnreleasedTasks = false;
        HashSet<String> allSubtopologyIds = new HashSet<String>(targetAssignment.keySet());
        allSubtopologyIds.addAll(currentAssignment.keySet());
        for (String subtopologyId : allSubtopologyIds) {
            hasUnreleasedTasks |= CurrentAssignmentBuilder.computeAssignmentDifferenceForOneSubtopologyWithEpoch(subtopologyId, currentAssignment.getOrDefault(subtopologyId, Map.of()), targetAssignment.getOrDefault(subtopologyId, Set.of()), targetAssignmentEpoch, resultAssignedTasks, resultTasksPendingRevocation, resultTasksPendingAssignment, isUnreleasedTask);
        }
        return hasUnreleasedTasks;
    }

    private static boolean computeAssignmentDifferenceForOneSubtopologyWithEpoch(String subtopologyId, Map<Integer, Integer> currentTasksForThisSubtopology, Set<Integer> targetTasksForThisSubtopology, int targetAssignmentEpoch, Map<String, Map<Integer, Integer>> resultAssignedTasks, Map<String, Map<Integer, Integer>> resultTasksPendingRevocation, Map<String, Map<Integer, Integer>> resultTasksPendingAssignment, BiPredicate<String, Integer> isUnreleasedTask) {
        HashMap<Integer, Integer> resultAssignedTasksForThisSubtopology = new HashMap<Integer, Integer>();
        for (Map.Entry<Integer, Integer> entry : currentTasksForThisSubtopology.entrySet()) {
            if (!targetTasksForThisSubtopology.contains(entry.getKey())) continue;
            resultAssignedTasksForThisSubtopology.put(entry.getKey(), entry.getValue());
        }
        HashMap<Integer, Integer> resultTasksPendingRevocationForThisSubtopology = new HashMap<Integer, Integer>(currentTasksForThisSubtopology);
        resultTasksPendingRevocationForThisSubtopology.keySet().removeAll(resultAssignedTasksForThisSubtopology.keySet());
        HashMap<Integer, Integer> resultTasksPendingAssignmentForThisSubtopology = new HashMap<Integer, Integer>();
        for (Integer taskId2 : targetTasksForThisSubtopology) {
            if (resultAssignedTasksForThisSubtopology.containsKey(taskId2)) continue;
            resultTasksPendingAssignmentForThisSubtopology.put(taskId2, targetAssignmentEpoch);
        }
        boolean hasUnreleasedTasks = resultTasksPendingAssignmentForThisSubtopology.keySet().removeIf(taskId -> isUnreleasedTask.test(subtopologyId, (Integer)taskId));
        if (!resultAssignedTasksForThisSubtopology.isEmpty()) {
            resultAssignedTasks.put(subtopologyId, resultAssignedTasksForThisSubtopology);
        }
        if (!resultTasksPendingRevocationForThisSubtopology.isEmpty()) {
            resultTasksPendingRevocation.put(subtopologyId, resultTasksPendingRevocationForThisSubtopology);
        }
        if (!resultTasksPendingAssignmentForThisSubtopology.isEmpty()) {
            resultTasksPendingAssignment.put(subtopologyId, resultTasksPendingAssignmentForThisSubtopology);
        }
        return hasUnreleasedTasks;
    }

    private StreamsGroupMember computeNextAssignment(int memberEpoch, TasksTupleWithEpochs memberAssignedTasks) {
        HashMap<String, Map<Integer, Integer>> newActiveAssignedTasks = new HashMap<String, Map<Integer, Integer>>();
        HashMap<String, Map<Integer, Integer>> newActiveTasksPendingRevocation = new HashMap<String, Map<Integer, Integer>>();
        HashMap<String, Map<Integer, Integer>> newActiveTasksPendingAssignment = new HashMap<String, Map<Integer, Integer>>();
        HashMap<String, Set<Integer>> newStandbyAssignedTasks = new HashMap<String, Set<Integer>>();
        HashMap<String, Set<Integer>> newStandbyTasksPendingRevocation = new HashMap<String, Set<Integer>>();
        HashMap<String, Set<Integer>> newStandbyTasksPendingAssignment = new HashMap<String, Set<Integer>>();
        HashMap<String, Set<Integer>> newWarmupAssignedTasks = new HashMap<String, Set<Integer>>();
        HashMap<String, Set<Integer>> newWarmupTasksPendingRevocation = new HashMap<String, Set<Integer>>();
        HashMap<String, Set<Integer>> newWarmupTasksPendingAssignment = new HashMap<String, Set<Integer>>();
        boolean hasUnreleasedActiveTasks = this.computeAssignmentDifferenceWithEpoch(memberAssignedTasks.activeTasksWithEpochs(), this.targetAssignment.activeTasks(), this.targetAssignmentEpoch, newActiveAssignedTasks, newActiveTasksPendingRevocation, newActiveTasksPendingAssignment, (subtopologyId, partitionId) -> this.currentActiveTaskProcessId.apply((String)subtopologyId, (Integer)partitionId) != null || this.currentStandbyTaskProcessIds.apply((String)subtopologyId, (Integer)partitionId).contains(this.member.processId()) || this.currentWarmupTaskProcessIds.apply((String)subtopologyId, (Integer)partitionId).contains(this.member.processId()));
        boolean hasUnreleasedStandbyTasks = this.computeAssignmentDifference(memberAssignedTasks.standbyTasks(), this.targetAssignment.standbyTasks(), newStandbyAssignedTasks, newStandbyTasksPendingRevocation, newStandbyTasksPendingAssignment, (subtopologyId, partitionId) -> Objects.equals(this.currentActiveTaskProcessId.apply((String)subtopologyId, (Integer)partitionId), this.member.processId()) || this.currentStandbyTaskProcessIds.apply((String)subtopologyId, (Integer)partitionId).contains(this.member.processId()) || this.currentWarmupTaskProcessIds.apply((String)subtopologyId, (Integer)partitionId).contains(this.member.processId()));
        boolean hasUnreleasedWarmupTasks = this.computeAssignmentDifference(memberAssignedTasks.warmupTasks(), this.targetAssignment.warmupTasks(), newWarmupAssignedTasks, newWarmupTasksPendingRevocation, newWarmupTasksPendingAssignment, (subtopologyId, partitionId) -> Objects.equals(this.currentActiveTaskProcessId.apply((String)subtopologyId, (Integer)partitionId), this.member.processId()) || this.currentStandbyTaskProcessIds.apply((String)subtopologyId, (Integer)partitionId).contains(this.member.processId()) || this.currentWarmupTaskProcessIds.apply((String)subtopologyId, (Integer)partitionId).contains(this.member.processId()));
        return this.buildNewMember(memberEpoch, new TasksTupleWithEpochs(newActiveTasksPendingRevocation, newStandbyTasksPendingRevocation, newWarmupTasksPendingRevocation), new TasksTupleWithEpochs(newActiveAssignedTasks, newStandbyAssignedTasks, newWarmupAssignedTasks), new TasksTupleWithEpochs(newActiveTasksPendingAssignment, newStandbyTasksPendingAssignment, newWarmupTasksPendingAssignment), hasUnreleasedActiveTasks || hasUnreleasedStandbyTasks || hasUnreleasedWarmupTasks);
    }

    private StreamsGroupMember buildNewMember(int memberEpoch, TasksTupleWithEpochs newTasksPendingRevocation, TasksTupleWithEpochs newAssignedTasks, TasksTupleWithEpochs newTasksPendingAssignment, boolean hasUnreleasedTasks) {
        boolean hasTasksToBeRevoked;
        boolean bl = hasTasksToBeRevoked = !newTasksPendingRevocation.isEmpty() && (this.ownedTasks.isEmpty() || this.ownedTasks.get().containsAny(newTasksPendingRevocation));
        if (hasTasksToBeRevoked) {
            return new StreamsGroupMember.Builder(this.member).setState(MemberState.UNREVOKED_TASKS).updateMemberEpoch(memberEpoch).setAssignedTasks(newAssignedTasks).setTasksPendingRevocation(newTasksPendingRevocation).build();
        }
        if (!newTasksPendingAssignment.isEmpty()) {
            MemberState newState = hasUnreleasedTasks ? MemberState.UNRELEASED_TASKS : MemberState.STABLE;
            return new StreamsGroupMember.Builder(this.member).setState(newState).updateMemberEpoch(this.targetAssignmentEpoch).setAssignedTasks(newAssignedTasks.merge(newTasksPendingAssignment)).setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY).build();
        }
        if (hasUnreleasedTasks) {
            return new StreamsGroupMember.Builder(this.member).setState(MemberState.UNRELEASED_TASKS).updateMemberEpoch(this.targetAssignmentEpoch).setAssignedTasks(newAssignedTasks).setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY).build();
        }
        return new StreamsGroupMember.Builder(this.member).setState(MemberState.STABLE).updateMemberEpoch(this.targetAssignmentEpoch).setAssignedTasks(newAssignedTasks).setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY).build();
    }
}

