/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.metrics.RebalanceListenerMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;

public class DefaultStreamsRebalanceListener
implements StreamsRebalanceListener {
    private final Logger log;
    private final Time time;
    private final StreamsRebalanceData streamsRebalanceData;
    private final TaskManager taskManager;
    private final StreamThread streamThread;
    private final Sensor tasksRevokedSensor;
    private final Sensor tasksAssignedSensor;
    private final Sensor tasksLostSensor;

    public DefaultStreamsRebalanceListener(Logger log, Time time, StreamsRebalanceData streamsRebalanceData, StreamThread streamThread, TaskManager taskManager, StreamsMetricsImpl streamsMetrics, String threadId) {
        this.log = log;
        this.time = time;
        this.streamsRebalanceData = streamsRebalanceData;
        this.streamThread = streamThread;
        this.taskManager = taskManager;
        this.tasksRevokedSensor = RebalanceListenerMetrics.tasksRevokedSensor(threadId, streamsMetrics);
        this.tasksAssignedSensor = RebalanceListenerMetrics.tasksAssignedSensor(threadId, streamsMetrics);
        this.tasksLostSensor = RebalanceListenerMetrics.tasksLostSensor(threadId, streamsMetrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTasksRevoked(Set<StreamsRebalanceData.TaskId> tasks) {
        Map<TaskId, Set<TopicPartition>> activeTasksToRevokeWithPartitions = this.pairWithTopicPartitions(tasks.stream());
        Set<TopicPartition> partitionsToRevoke = activeTasksToRevokeWithPartitions.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
        long start = this.time.milliseconds();
        try {
            this.log.info("Revoking active tasks {}.", tasks);
            this.taskManager.handleRevocation(partitionsToRevoke);
        }
        finally {
            long latency = this.time.milliseconds() - start;
            this.tasksRevokedSensor.record((double)latency);
            this.log.info("partition revocation took {} ms.", (Object)latency);
        }
        if (this.streamThread.state() != StreamThread.State.PENDING_SHUTDOWN) {
            this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTasksAssigned(StreamsRebalanceData.Assignment assignment) {
        long start = this.time.milliseconds();
        Map<TaskId, Set<TopicPartition>> activeTasksWithPartitions = this.pairWithTopicPartitions(assignment.activeTasks().stream());
        Map<TaskId, Set<TopicPartition>> standbyTasksWithPartitions = this.pairWithTopicPartitions(Stream.concat(assignment.standbyTasks().stream(), assignment.warmupTasks().stream()));
        this.log.info("Processing new assignment {} from Streams Rebalance Protocol", (Object)assignment);
        try {
            this.streamThread.setStreamsGroupReady(assignment.isGroupReady());
            this.taskManager.handleAssignment(activeTasksWithPartitions, standbyTasksWithPartitions);
            this.streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
            this.taskManager.handleRebalanceComplete();
            this.streamsRebalanceData.setReconciledAssignment(assignment);
        }
        finally {
            this.tasksAssignedSensor.record((double)(this.time.milliseconds() - start));
        }
    }

    public void onAllTasksLost() {
        long start = this.time.milliseconds();
        try {
            this.taskManager.handleLostAll();
            this.streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
        }
        finally {
            this.tasksLostSensor.record((double)(this.time.milliseconds() - start));
        }
    }

    private Map<TaskId, Set<TopicPartition>> pairWithTopicPartitions(Stream<StreamsRebalanceData.TaskId> taskIdStream) {
        return taskIdStream.collect(Collectors.toMap(this::toTaskId, task -> this.toTopicPartitions((StreamsRebalanceData.TaskId)task, (StreamsRebalanceData.Subtopology)this.streamsRebalanceData.subtopologies().get(task.subtopologyId()))));
    }

    private TaskId toTaskId(StreamsRebalanceData.TaskId task) {
        return new TaskId(Integer.parseInt(task.subtopologyId()), task.partitionId());
    }

    private Set<TopicPartition> toTopicPartitions(StreamsRebalanceData.TaskId task, StreamsRebalanceData.Subtopology subTopology) {
        return Stream.concat(subTopology.sourceTopics().stream(), subTopology.repartitionSourceTopics().keySet().stream()).map(t -> new TopicPartition(t, task.partitionId())).collect(Collectors.toSet());
    }
}

