/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;

class FSPreemptionThread
extends Thread {
    private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
    protected final FSContext context;
    private final FairScheduler scheduler;
    private final long warnTimeBeforeKill;
    private final long delayBeforeNextStarvationCheck;
    private final Timer preemptionTimer;
    private final Lock schedulerReadLock;

    FSPreemptionThread(FairScheduler scheduler) {
        this.setDaemon(true);
        this.setName("FSPreemptionThread");
        this.scheduler = scheduler;
        this.context = scheduler.getContext();
        FairSchedulerConfiguration fsConf = scheduler.getConf();
        this.context.setPreemptionEnabled();
        this.context.setPreemptionUtilizationThreshold(fsConf.getPreemptionUtilizationThreshold());
        this.preemptionTimer = new Timer("Preemption Timer", true);
        this.warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
        long allocDelay = fsConf.isContinuousSchedulingEnabled() ? (long)(10 * fsConf.getContinuousSchedulingSleepMs()) : 4L * scheduler.getNMHeartbeatInterval();
        this.delayBeforeNextStarvationCheck = this.warnTimeBeforeKill + allocDelay + fsConf.getWaitTimeBeforeNextStarvationCheck();
        this.schedulerReadLock = scheduler.getSchedulerReadLock();
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                FSAppAttempt starvedApp = this.context.getStarvedApps().take();
                this.schedulerReadLock.lock();
                try {
                    this.preemptContainers(this.identifyContainersToPreempt(starvedApp));
                }
                finally {
                    this.schedulerReadLock.unlock();
                }
                starvedApp.preemptionTriggered(this.delayBeforeNextStarvationCheck);
            }
            catch (InterruptedException e) {
                LOG.info((Object)"Preemption thread interrupted! Exiting.");
                Thread.currentThread().interrupt();
            }
        }
    }

    private List<RMContainer> identifyContainersToPreempt(FSAppAttempt starvedApp) {
        ArrayList<RMContainer> containersToPreempt = new ArrayList<RMContainer>();
        for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
            for (int i = 0; i < rr.getNumContainers(); ++i) {
                PreemptableContainers bestContainers = null;
                List potentialNodes = this.scheduler.getNodeTracker().getNodesByResourceName(rr.getResourceName());
                int maxAMContainers = Integer.MAX_VALUE;
                for (FSSchedulerNode node : potentialNodes) {
                    PreemptableContainers preemptableContainers = this.identifyContainersToPreemptOnNode(rr.getCapability(), node, maxAMContainers);
                    if (preemptableContainers == null) continue;
                    bestContainers = preemptableContainers;
                    maxAMContainers = bestContainers.numAMContainers;
                    if (maxAMContainers != 0) continue;
                    break;
                }
                if (bestContainers == null || bestContainers.containers.size() <= 0) continue;
                containersToPreempt.addAll(bestContainers.containers);
                this.trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
            }
        }
        return containersToPreempt;
    }

    private PreemptableContainers identifyContainersToPreemptOnNode(Resource request, FSSchedulerNode node, int maxAMContainers) {
        PreemptableContainers preemptableContainers = new PreemptableContainers(maxAMContainers);
        List<RMContainer> containersToCheck = node.getRunningContainersWithAMsAtTheEnd();
        containersToCheck.removeAll(node.getContainersForPreemption());
        Resource potential = Resources.subtractFromNonNegative((Resource)Resources.clone((Resource)node.getUnallocatedResource()), (Resource)node.getTotalReserved());
        for (RMContainer container : containersToCheck) {
            FSAppAttempt app = this.scheduler.getSchedulerApp(container.getApplicationAttemptId());
            if (app.canContainerBePreempted(container)) {
                if (!preemptableContainers.addContainer(container)) {
                    return null;
                }
                Resources.addTo((Resource)potential, (Resource)container.getAllocatedResource());
            }
            if (!Resources.fitsIn((Resource)request, (Resource)potential)) continue;
            return preemptableContainers;
        }
        return null;
    }

    private void trackPreemptionsAgainstNode(List<RMContainer> containers, FSAppAttempt app) {
        FSSchedulerNode node = (FSSchedulerNode)this.scheduler.getNodeTracker().getNode(containers.get(0).getNodeId());
        node.addContainersForPreemption(containers, app);
    }

    private void preemptContainers(List<RMContainer> containers) {
        for (RMContainer container : containers) {
            ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
            FSAppAttempt app = this.scheduler.getSchedulerApp(appAttemptId);
            LOG.info((Object)("Preempting container " + container + " from queue " + app.getQueueName()));
            app.trackContainerForPreemption(container);
        }
        this.preemptionTimer.schedule((TimerTask)new PreemptContainersTask(containers), this.warnTimeBeforeKill);
    }

    private static class PreemptableContainers {
        List<RMContainer> containers = new ArrayList<RMContainer>();
        int numAMContainers = 0;
        int maxAMContainers;

        PreemptableContainers(int maxAMContainers) {
            this.maxAMContainers = maxAMContainers;
        }

        private boolean addContainer(RMContainer container) {
            if (container.isAMContainer()) {
                ++this.numAMContainers;
                if (this.numAMContainers >= this.maxAMContainers) {
                    return false;
                }
            }
            this.containers.add(container);
            return true;
        }
    }

    private class PreemptContainersTask
    extends TimerTask {
        private final List<RMContainer> containers;

        PreemptContainersTask(List<RMContainer> containers) {
            this.containers = containers;
        }

        @Override
        public void run() {
            for (RMContainer container : this.containers) {
                ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus(container.getContainerId(), "Container preempted by scheduler");
                LOG.info((Object)("Killing container " + container));
                FSPreemptionThread.this.scheduler.completedContainer(container, status, RMContainerEventType.KILL);
            }
        }
    }
}

