package org.apache.crunch.impl.mr.plan;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.crunch.Target;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.impl.dist.collect.MRCollection;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.mr.collect.DoTable;
import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
import org.apache.crunch.impl.mr.run.CrunchCombiner;
import org.apache.crunch.impl.mr.run.CrunchInputFormat;
import org.apache.crunch.impl.mr.run.CrunchMapper;
import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
import org.apache.crunch.impl.mr.run.CrunchReducer;
import org.apache.crunch.impl.mr.run.NodeContext;
import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.types.PType;
import org.apache.crunch.util.DistCache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/crunch/impl/mr/plan/JobPrototype.class */
public class JobPrototype {
    private static final String DFS_REPLICATION = "dfs.replication";
    private static final String DFS_REPLICATION_INITIAL = "dfs.replication.initial";
    private static final String CRUNCH_TMP_DIR_REPLICATION = "crunch.tmp.dir.replication";
    private final int jobID;
    private final Set<NodePath> mapNodePaths;
    private final PGroupedTableImpl<?, ?> group;
    private final Set<JobPrototype> dependencies;
    private final Map<PCollectionImpl<?>, DoNode> nodes;
    private final Path workingPath;
    private HashMultimap<Target, NodePath> mapSideNodePaths;
    private HashMultimap<Target, NodePath> targetsToNodePaths;
    private DoTable<?, ?> combineFnTable;
    private CrunchControlledJob job;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) JobPrototype.class);

    public static JobPrototype createMapReduceJob(int i, PGroupedTableImpl<?, ?> pGroupedTableImpl, Set<NodePath> set, Path path) {
        return new JobPrototype(i, set, pGroupedTableImpl, path);
    }

    public static JobPrototype createMapOnlyJob(int i, HashMultimap<Target, NodePath> hashMultimap, Path path) {
        return new JobPrototype(i, hashMultimap, path);
    }

    private JobPrototype(int i, Set<NodePath> set, PGroupedTableImpl<?, ?> pGroupedTableImpl, Path path) {
        this.dependencies = Sets.newHashSet();
        this.nodes = Maps.newHashMap();
        this.jobID = i;
        this.mapNodePaths = ImmutableSet.copyOf(set);
        this.group = pGroupedTableImpl;
        this.workingPath = path;
        this.targetsToNodePaths = null;
    }

    @VisibleForTesting
    private JobPrototype(int i, HashMultimap<Target, NodePath> hashMultimap, Path path) {
        this.dependencies = Sets.newHashSet();
        this.nodes = Maps.newHashMap();
        this.jobID = i;
        this.group = null;
        this.mapNodePaths = null;
        this.workingPath = path;
        this.targetsToNodePaths = hashMultimap;
    }

    public int getJobID() {
        return this.jobID;
    }

    public boolean isMapOnly() {
        return this.group == null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<NodePath> getMapNodePaths() {
        return this.mapNodePaths;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HashMultimap<Target, NodePath> getMapSideNodePaths() {
        return this.mapSideNodePaths;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HashMultimap<Target, NodePath> getTargetsToNodePaths() {
        return this.targetsToNodePaths;
    }

    public void addMapSideOutputs(HashMultimap<Target, NodePath> hashMultimap) {
        if (this.group == null) {
            throw new IllegalStateException("Cannot side-outputs to a map-only job");
        }
        this.mapSideNodePaths = hashMultimap;
    }

    public void addReducePaths(HashMultimap<Target, NodePath> hashMultimap) {
        if (this.group == null) {
            throw new IllegalStateException("Cannot add a reduce phase to a map-only job");
        }
        this.targetsToNodePaths = hashMultimap;
    }

    public void addDependency(JobPrototype jobPrototype) {
        this.dependencies.add(jobPrototype);
    }

    public CrunchControlledJob getCrunchJob(Class<?> cls, Configuration configuration, MRPipeline mRPipeline, int i) throws IOException {
        if (this.job == null) {
            this.job = build(cls, configuration, mRPipeline, i);
            Iterator<JobPrototype> it2 = this.dependencies.iterator();
            while (it2.hasNext()) {
                this.job.addDependingJob(it2.next().getCrunchJob(cls, configuration, mRPipeline, i));
            }
        }
        return this.job;
    }

    private CrunchControlledJob build(Class<?> cls, Configuration configuration, MRPipeline mRPipeline, int i) throws IOException {
        ArrayList newArrayList;
        Job job = new Job(configuration);
        LOG.debug(String.format("Replication factor: %s", job.getConfiguration().get(DFS_REPLICATION)));
        Configuration configuration2 = job.getConfiguration();
        configuration2.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, this.workingPath.toString());
        job.setJarByClass(cls);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        Path path = new Path(this.workingPath, "output");
        MSCROutputHandler mSCROutputHandler = new MSCROutputHandler(job, path, this.group == null);
        boolean z = true;
        for (Target target : this.targetsToNodePaths.keySet()) {
            DoNode doNode = null;
            LOG.debug("Target path: " + target);
            for (NodePath nodePath : this.targetsToNodePaths.get(target)) {
                if (doNode == null) {
                    PType<?> pType = nodePath.tail().getPType();
                    doNode = DoNode.createOutputNode(target.toString(), target.getConverter(pType), pType);
                    mSCROutputHandler.configureNode(doNode, target);
                    z &= DistributedPipeline.isTempDir(job, target.toString());
                }
                newHashSet.add(walkPath(nodePath.descendingIterator(), doNode));
            }
            newHashSet2.add(target);
        }
        setJobReplication(job.getConfiguration(), z);
        HashSet newHashSet3 = Sets.newHashSet();
        if (this.mapSideNodePaths != null) {
            for (Target target2 : this.mapSideNodePaths.keySet()) {
                DoNode doNode2 = null;
                for (NodePath nodePath2 : this.mapSideNodePaths.get(target2)) {
                    if (doNode2 == null) {
                        PType<?> pType2 = nodePath2.tail().getPType();
                        doNode2 = DoNode.createOutputNode(target2.toString(), target2.getConverter(pType2), pType2);
                        mSCROutputHandler.configureNode(doNode2, target2);
                    }
                    newHashSet3.add(walkPath(nodePath2.descendingIterator(), doNode2));
                }
                newHashSet2.add(target2);
            }
        }
        job.setMapperClass(CrunchMapper.class);
        DoNode doNode3 = null;
        if (this.group != null) {
            job.setReducerClass(CrunchReducer.class);
            ArrayList newArrayList2 = Lists.newArrayList(newHashSet);
            serialize(newArrayList2, configuration2, this.workingPath, NodeContext.REDUCE);
            doNode3 = newArrayList2.get(0);
            if (this.combineFnTable != null) {
                job.setCombinerClass(CrunchCombiner.class);
                DoNode createDoNode = this.group.createDoNode();
                DoNode createCombineNode = this.combineFnTable.createCombineNode();
                createCombineNode.addChild(this.group.getGroupingNode());
                createDoNode.addChild(createCombineNode);
                serialize(ImmutableList.of(createDoNode), configuration2, this.workingPath, NodeContext.COMBINE);
            }
            this.group.configureShuffle(job);
            DoNode groupingNode = this.group.getGroupingNode();
            HashSet newHashSet4 = Sets.newHashSet(newHashSet3);
            Iterator<NodePath> it2 = this.mapNodePaths.iterator();
            while (it2.hasNext()) {
                Iterator<PCollectionImpl<?>> descendingIterator = it2.next().descendingIterator();
                descendingIterator.next();
                newHashSet4.add(walkPath(descendingIterator, groupingNode));
            }
            newArrayList = Lists.newArrayList(newHashSet4);
        } else {
            job.setNumReduceTasks(0);
            newArrayList = Lists.newArrayList(newHashSet);
        }
        job.setOutputFormatClass(CrunchOutputFormat.class);
        serialize(newArrayList, configuration2, this.workingPath, NodeContext.MAP);
        if (newArrayList.size() == 1) {
            newArrayList.get(0).getSource().configureSource(job, -1);
        } else {
            for (int i2 = 0; i2 < newArrayList.size(); i2++) {
                newArrayList.get(i2).getSource().configureSource(job, i2);
            }
            job.setInputFormatClass(CrunchInputFormat.class);
        }
        return new CrunchControlledJob(this.jobID, job, createJobNameBuilder(configuration2, mRPipeline.getName(), newArrayList, doNode3, i), newHashSet2, getHook(new CrunchJobHooks.PrepareHook(), mRPipeline.getPrepareHooks()), getHook(new CrunchJobHooks.CompletionHook(path, mSCROutputHandler.getMultiPaths()), mRPipeline.getCompletionHooks()));
    }

    @VisibleForTesting
    protected void setJobReplication(Configuration configuration, boolean z) {
        String str = configuration.get(CRUNCH_TMP_DIR_REPLICATION);
        if (str == null) {
            return;
        }
        handleInitialReplication(configuration);
        if (z) {
            LOG.debug(String.format("Setting replication factor to: %s ", str));
            configuration.set(DFS_REPLICATION, str);
        } else {
            String str2 = configuration.get(DFS_REPLICATION_INITIAL);
            LOG.debug(String.format("Using initial replication factor (%s)", str2));
            configuration.set(DFS_REPLICATION, str2);
        }
    }

    @VisibleForTesting
    protected void handleInitialReplication(Configuration configuration) {
        String str = configuration.get(DFS_REPLICATION_INITIAL);
        if (str != null) {
            LOG.debug(String.format("Initial replication has been already set (%s); nothing to do.", str));
            return;
        }
        String str2 = configuration.get(DFS_REPLICATION);
        if (str2 != null) {
            LOG.debug(String.format("Using dfs.replication (%s) set by user as initial replication.", str2));
            setInitialJobReplicationConfig(configuration, str2);
        } else {
            Target target = (Target) this.targetsToNodePaths.keySet().iterator().next();
            if (target instanceof FileTargetImpl) {
                str2 = tryGetDefaultReplicationFromFileSystem(configuration, ((FileTargetImpl) target).getPath(), "3");
            }
            setInitialJobReplicationConfig(configuration, str2);
        }
    }

    private String tryGetDefaultReplicationFromFileSystem(Configuration configuration, Path path, String str) {
        String str2;
        try {
            str2 = path.getFileSystem(configuration).getConf().get(DFS_REPLICATION);
            LOG.debug(String.format("Using dfs.replication (%s) retrieved from remote filesystem as initial replication.", str2));
        } catch (IOException e) {
            str2 = str;
            LOG.warn(String.format("Cannot read job's config. Setting initial replication to %s.", str2));
        }
        return str2;
    }

    private void setInitialJobReplicationConfig(Configuration configuration, String str) {
        configuration.set(DFS_REPLICATION_INITIAL, str);
    }

    private static CrunchControlledJob.Hook getHook(CrunchControlledJob.Hook hook, List<CrunchControlledJob.Hook> list) {
        if (list.isEmpty()) {
            return hook;
        }
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(hook);
        newArrayList.addAll(list);
        return new CrunchJobHooks.CompositeHook(newArrayList);
    }

    private void serialize(List<DoNode> list, Configuration configuration, Path path, NodeContext nodeContext) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<DoNode> it2 = list.iterator();
        while (it2.hasNext()) {
            newArrayList.add(it2.next().toRTNode(true, configuration, nodeContext));
        }
        DistCache.write(configuration, new Path(path, nodeContext.toString()), newArrayList);
    }

    private JobNameBuilder createJobNameBuilder(Configuration configuration, String str, List<DoNode> list, DoNode doNode, int i) {
        JobNameBuilder jobNameBuilder = new JobNameBuilder(configuration, str, this.jobID, i);
        jobNameBuilder.visit(list);
        if (doNode != null) {
            jobNameBuilder.visit(doNode);
        }
        return jobNameBuilder;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private DoNode walkPath(Iterator<PCollectionImpl<?>> it2, DoNode doNode) {
        while (it2.hasNext()) {
            Object obj = (PCollectionImpl) it2.next();
            if (this.combineFnTable != null && !(obj instanceof PGroupedTableImpl)) {
                this.combineFnTable = null;
            } else if ((obj instanceof DoTable) && ((DoTable) obj).hasCombineFn()) {
                this.combineFnTable = (DoTable) obj;
            }
            if (!this.nodes.containsKey(obj)) {
                this.nodes.put(obj, ((MRCollection) obj).createDoNode());
            }
            DoNode doNode2 = this.nodes.get(obj);
            doNode2.addChild(doNode);
            doNode = doNode2;
        }
        return doNode;
    }
}
