/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.SubpartitionIndexRange;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

public class TaskDeploymentDescriptorFactory {
    private final ExecutionAttemptID executionId;
    private final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
    private final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInfo;
    private final JobID jobID;
    private final PartitionLocationConstraint partitionDeploymentConstraint;
    private final List<ConsumedPartitionGroup> consumedPartitionGroups;
    private final Function<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionRetriever;
    private final BlobWriter blobWriter;
    private final Map<IntermediateDataSetID, ShuffleDescriptor[]> consumedClusterPartitionShuffleDescriptors;

    private TaskDeploymentDescriptorFactory(ExecutionAttemptID executionId, TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInfo, JobID jobID, PartitionLocationConstraint partitionDeploymentConstraint, List<ConsumedPartitionGroup> consumedPartitionGroups, Function<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionRetriever, BlobWriter blobWriter, Map<IntermediateDataSetID, ShuffleDescriptor[]> consumedClusterPartitionShuffleDescriptors) {
        this.executionId = executionId;
        this.serializedJobInformation = serializedJobInformation;
        this.taskInfo = taskInfo;
        this.jobID = jobID;
        this.partitionDeploymentConstraint = partitionDeploymentConstraint;
        this.consumedPartitionGroups = consumedPartitionGroups;
        this.resultPartitionRetriever = resultPartitionRetriever;
        this.blobWriter = blobWriter;
        this.consumedClusterPartitionShuffleDescriptors = consumedClusterPartitionShuffleDescriptors;
    }

    public TaskDeploymentDescriptor createDeploymentDescriptor(AllocationID allocationID, @Nullable JobManagerTaskRestore taskRestore, Collection<ResultPartitionDeploymentDescriptor> producedPartitions) throws IOException {
        return new TaskDeploymentDescriptor(this.jobID, this.serializedJobInformation, this.taskInfo, this.executionId, allocationID, taskRestore, new ArrayList<ResultPartitionDeploymentDescriptor>(producedPartitions), this.createInputGateDeploymentDescriptors());
    }

    private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors() throws IOException {
        ArrayList<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(this.consumedPartitionGroups.size());
        for (ConsumedPartitionGroup consumedPartitionGroup : this.consumedPartitionGroups) {
            IntermediateResultPartition resultPartition = this.resultPartitionRetriever.apply(consumedPartitionGroup.getFirst());
            IntermediateResult consumedIntermediateResult = resultPartition.getIntermediateResult();
            SubpartitionIndexRange consumedSubpartitionRange = TaskDeploymentDescriptorFactory.computeConsumedSubpartitionRange(consumedPartitionGroup.getNumConsumers(), resultPartition, this.executionId.getSubtaskIndex());
            IntermediateDataSetID resultId = consumedIntermediateResult.getId();
            ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
            inputGates.add(new InputGateDeploymentDescriptor(resultId, partitionType, consumedSubpartitionRange, this.getConsumedPartitionShuffleDescriptors(consumedIntermediateResult, consumedPartitionGroup)));
        }
        for (Map.Entry entry : this.consumedClusterPartitionShuffleDescriptors.entrySet()) {
            inputGates.add(new InputGateDeploymentDescriptor((IntermediateDataSetID)entry.getKey(), ResultPartitionType.BLOCKING_PERSISTENT, 0, (ShuffleDescriptor[])entry.getValue()));
        }
        return inputGates;
    }

    public static SubpartitionIndexRange computeConsumedSubpartitionRange(int numConsumers, IntermediateResultPartition resultPartition, int consumerSubtaskIndex) {
        int consumerIndex = consumerSubtaskIndex % numConsumers;
        IntermediateResult consumedIntermediateResult = resultPartition.getIntermediateResult();
        int numSubpartitions = resultPartition.getNumberOfSubpartitions();
        return TaskDeploymentDescriptorFactory.computeConsumedSubpartitionRange(consumerIndex, numConsumers, numSubpartitions, consumedIntermediateResult.getProducer().getGraph().isDynamic(), consumedIntermediateResult.isBroadcast());
    }

    @VisibleForTesting
    static SubpartitionIndexRange computeConsumedSubpartitionRange(int consumerIndex, int numConsumers, int numSubpartitions, boolean isDynamicGraph, boolean isBroadcast) {
        if (!isDynamicGraph) {
            Preconditions.checkArgument(numConsumers == numSubpartitions);
            return new SubpartitionIndexRange(consumerIndex, consumerIndex);
        }
        if (isBroadcast) {
            Preconditions.checkArgument(numSubpartitions == 1);
            return new SubpartitionIndexRange(0, 0);
        }
        Preconditions.checkArgument(consumerIndex < numConsumers);
        Preconditions.checkArgument(numConsumers <= numSubpartitions);
        int start = consumerIndex * numSubpartitions / numConsumers;
        int nextStart = (consumerIndex + 1) * numSubpartitions / numConsumers;
        return new SubpartitionIndexRange(start, nextStart - 1);
    }

    private TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> getConsumedPartitionShuffleDescriptors(IntermediateResult intermediateResult, ConsumedPartitionGroup consumedPartitionGroup) throws IOException {
        TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> serializedShuffleDescriptors = intermediateResult.getCachedShuffleDescriptors(consumedPartitionGroup);
        if (serializedShuffleDescriptors == null) {
            serializedShuffleDescriptors = this.computeConsumedPartitionShuffleDescriptors(consumedPartitionGroup);
            intermediateResult.cacheShuffleDescriptors(consumedPartitionGroup, serializedShuffleDescriptors);
        }
        return serializedShuffleDescriptors;
    }

    private TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> computeConsumedPartitionShuffleDescriptors(ConsumedPartitionGroup consumedPartitionGroup) throws IOException {
        ShuffleDescriptor[] shuffleDescriptors = new ShuffleDescriptor[consumedPartitionGroup.size()];
        int i = 0;
        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
            shuffleDescriptors[i++] = TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(this.resultPartitionRetriever.apply(partitionId), this.partitionDeploymentConstraint);
        }
        return this.serializeAndTryOffloadShuffleDescriptors(shuffleDescriptors);
    }

    private TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> serializeAndTryOffloadShuffleDescriptors(ShuffleDescriptor[] shuffleDescriptors) throws IOException {
        CompressedSerializedValue<ShuffleDescriptor[]> compressedSerializedValue = CompressedSerializedValue.fromObject(shuffleDescriptors);
        Either<SerializedValue<ShuffleDescriptor[]>, PermanentBlobKey> serializedValueOrBlobKey = BlobWriter.tryOffload(compressedSerializedValue, this.jobID, this.blobWriter);
        if (serializedValueOrBlobKey.isLeft()) {
            return new TaskDeploymentDescriptor.NonOffloaded<ShuffleDescriptor[]>(serializedValueOrBlobKey.left());
        }
        return new TaskDeploymentDescriptor.Offloaded<ShuffleDescriptor[]>(serializedValueOrBlobKey.right());
    }

    public static TaskDeploymentDescriptorFactory fromExecution(Execution execution) throws IOException, ClusterDatasetCorruptedException {
        Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors;
        ExecutionVertex executionVertex = execution.getVertex();
        InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor();
        try {
            clusterPartitionShuffleDescriptors = TaskDeploymentDescriptorFactory.getClusterPartitionShuffleDescriptors(executionVertex);
        }
        catch (Throwable e) {
            throw new ClusterDatasetCorruptedException(e, executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume());
        }
        return new TaskDeploymentDescriptorFactory(execution.getAttemptId(), TaskDeploymentDescriptorFactory.getSerializedJobInformation(internalExecutionGraphAccessor), TaskDeploymentDescriptorFactory.getSerializedTaskInformation(executionVertex.getJobVertex().getTaskInformationOrBlobKey()), internalExecutionGraphAccessor.getJobID(), internalExecutionGraphAccessor.getPartitionLocationConstraint(), executionVertex.getAllConsumedPartitionGroups(), internalExecutionGraphAccessor::getResultPartitionOrThrow, internalExecutionGraphAccessor.getBlobWriter(), clusterPartitionShuffleDescriptors);
    }

    private static Map<IntermediateDataSetID, ShuffleDescriptor[]> getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) {
        InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor();
        List<IntermediateDataSetID> consumedClusterDataSetIds = executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
        HashMap<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors = new HashMap<IntermediateDataSetID, ShuffleDescriptor[]>();
        for (IntermediateDataSetID consumedClusterDataSetId : consumedClusterDataSetIds) {
            List<ShuffleDescriptor> shuffleDescriptors = internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(consumedClusterDataSetId);
            Preconditions.checkState(executionVertex.getTotalNumberOfParallelSubtasks() == shuffleDescriptors.size(), "The parallelism (%s) of the cache consuming job vertex is different from the number of shuffle descriptors (%s) of the intermediate data set", executionVertex.getTotalNumberOfParallelSubtasks(), shuffleDescriptors.size());
            clusterPartitionShuffleDescriptors.put(consumedClusterDataSetId, new ShuffleDescriptor[]{shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex())});
        }
        return clusterPartitionShuffleDescriptors;
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> getSerializedJobInformation(InternalExecutionGraphAccessor internalExecutionGraphAccessor) {
        Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = internalExecutionGraphAccessor.getJobInformationOrBlobKey();
        if (jobInformationOrBlobKey.isLeft()) {
            return new TaskDeploymentDescriptor.NonOffloaded<JobInformation>(jobInformationOrBlobKey.left());
        }
        return new TaskDeploymentDescriptor.Offloaded<JobInformation>(jobInformationOrBlobKey.right());
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> getSerializedTaskInformation(Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInfo) {
        return taskInfo.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded<TaskInformation>(taskInfo.left()) : new TaskDeploymentDescriptor.Offloaded(taskInfo.right());
    }

    public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(IntermediateResultPartition consumedPartition, PartitionLocationConstraint partitionDeploymentConstraint) {
        Execution producer = consumedPartition.getProducer().getPartitionProducer();
        ExecutionState producerState = producer.getState();
        Optional<ResultPartitionDeploymentDescriptor> consumedPartitionDescriptor = producer.getResultPartitionDeploymentDescriptor(consumedPartition.getPartitionId());
        ResultPartitionID consumedPartitionId = new ResultPartitionID(consumedPartition.getPartitionId(), producer.getAttemptId());
        return TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(consumedPartitionId, consumedPartition.getResultType(), consumedPartition.isConsumable(), producerState, partitionDeploymentConstraint, consumedPartitionDescriptor.orElse(null));
    }

    @VisibleForTesting
    static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(ResultPartitionID consumedPartitionId, ResultPartitionType resultPartitionType, boolean isConsumable, ExecutionState producerState, PartitionLocationConstraint partitionDeploymentConstraint, @Nullable ResultPartitionDeploymentDescriptor consumedPartitionDescriptor) {
        if ((resultPartitionType.canBePipelinedConsumed() || isConsumable) && consumedPartitionDescriptor != null && TaskDeploymentDescriptorFactory.isProducerAvailable(producerState)) {
            return consumedPartitionDescriptor.getShuffleDescriptor();
        }
        if (partitionDeploymentConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN) {
            return new UnknownShuffleDescriptor(consumedPartitionId);
        }
        throw TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(consumedPartitionId, resultPartitionType, isConsumable, producerState);
    }

    private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(ResultPartitionID consumedPartitionId, ResultPartitionType resultPartitionType, boolean isConsumable, ExecutionState producerState) {
        String msg = TaskDeploymentDescriptorFactory.isProducerFailedOrCanceled(producerState) ? "Trying to consume an input partition whose producer has been canceled or failed. The producer is in state " + (Object)((Object)producerState) + "." : String.format("Trying to consume an input partition whose producer is not ready (result type: %s, partition consumable: %s, producer state: %s, partition id: %s).", new Object[]{resultPartitionType, isConsumable, producerState, consumedPartitionId});
        return new IllegalStateException(msg);
    }

    private static boolean isProducerAvailable(ExecutionState producerState) {
        return producerState == ExecutionState.RUNNING || producerState == ExecutionState.INITIALIZING || producerState == ExecutionState.FINISHED || producerState == ExecutionState.SCHEDULED || producerState == ExecutionState.DEPLOYING;
    }

    private static boolean isProducerFailedOrCanceled(ExecutionState producerState) {
        return producerState == ExecutionState.CANCELING || producerState == ExecutionState.CANCELED || producerState == ExecutionState.FAILED;
    }

    public static enum PartitionLocationConstraint {
        MUST_BE_KNOWN,
        CAN_BE_UNKNOWN;


        public static PartitionLocationConstraint fromJobType(JobType jobType) {
            switch (jobType) {
                case BATCH: {
                    return CAN_BE_UNKNOWN;
                }
                case STREAMING: {
                    return MUST_BE_KNOWN;
                }
            }
            throw new IllegalArgumentException(String.format("Unknown JobType %s. Cannot derive partition location constraint for it.", new Object[]{jobType}));
        }
    }
}

