/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerPreemptionTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestCapacitySchedulerSurgicalPreemption
extends CapacitySchedulerPreemptionTestBase {
    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.conf.setBoolean("yarn.resourcemanager.monitor.capacity.preemption.select_based_on_reserved_containers", true);
    }

    @Test(timeout=60000L)
    public void testSimpleSurgicalPreemption() throws Exception {
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 20480);
        MockNM nm2 = rm1.registerNode("h2:1234", 20480);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 32, new ArrayList<ContainerId>());
        for (int i = 0; i < 32; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)33L, (long)schedulerApp1.getLiveContainers().size());
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 17);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 16);
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
        Assert.assertEquals((long)2048L, (long)cs.getNode(nm1.getNodeId()).getUnallocatedResource().getMemorySize());
        Assert.assertEquals((long)4096L, (long)cs.getNode(nm2.getNodeId()).getUnallocatedResource().getMemorySize());
        am2.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)6144), (int)1)), null);
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        Assert.assertNotNull((Object)cs.getNode(nm1.getNodeId()).getReservedContainer());
        SchedulingEditPolicy editPolicy = this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 29);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 13);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 16);
        Assert.assertEquals((String)"Number of preempted containers incorrectly recorded:", (long)4L, (long)cs.getQueue("root").getMetrics().getAggregatePreemptedContainers());
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testSurgicalPreemptionWithAvailableResource() throws Exception {
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 20480);
        MockNM nm2 = rm1.registerNode("h2:1234", 20480);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 38, new ArrayList<ContainerId>());
        for (int i = 0; i < 38; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)39L, (long)schedulerApp1.getLiveContainers().size());
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 20);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 19);
        RMApp app2 = rm1.submitApp(4096, "app", "user", null, "c");
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)app2.getApplicationId(), (int)1));
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        Assert.assertEquals((long)3L, (long)editPolicy.getToPreemptContainers().size());
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 36);
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        this.waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        for (int tick = 0; schedulerApp2.getLiveContainers().size() != 1 && tick < 10; ++tick) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
            Thread.sleep(100L);
        }
        this.waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities() throws Exception {
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000L);
        this.conf.setQueueOrderingPolicy("root", "priority-utilization");
        this.conf.setQueuePriority("root.c", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 20480);
        MockNM nm2 = rm1.registerNode("h2:1234", 20480);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "b");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 6, new ArrayList<ContainerId>());
        for (int i = 0; i < 3; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 4);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 3);
        RMApp app2 = rm1.submitApp(18432, "app", "user", null, "c");
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)app2.getApplicationId(), (int)1));
        while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            Thread.sleep(10L);
        }
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        Assert.assertEquals((long)0L, (long)editPolicy.getToPreemptContainers().size());
        Thread.sleep(1000L);
        editPolicy.editSchedule();
        Assert.assertEquals((long)2L, (long)editPolicy.getToPreemptContainers().size());
        editPolicy.editSchedule();
        while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            Thread.sleep(10L);
        }
        this.waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
        rm1.close();
    }

    @Test(timeout=300000L)
    public void testPriorityPreemptionRequiresMoveReservation() throws Exception {
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000L);
        this.conf.setQueueOrderingPolicy("root", "priority-utilization");
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true);
        this.conf.setQueuePriority("root.c", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 10240);
        MockNM nm2 = rm1.registerNode("h2:1234", 10240);
        MockNM nm3 = rm1.registerNode("h3:1234", 10240);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMNode rmNode3 = (RMNode)rm1.getRMContext().getRMNodes().get(nm3.getNodeId());
        RMApp app1 = rm1.submitApp(2048, "app", "user", null, "b");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 2048, 2, new ArrayList<ContainerId>());
        for (int i = 0; i < 2; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)3L, (long)schedulerApp1.getLiveContainers().size());
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), am1.getApplicationAttemptId(), 1);
        this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), am1.getApplicationAttemptId(), 2);
        RMApp app2 = rm1.submitApp(2048, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)app2.getApplicationId(), (int)1));
        am2.allocate("*", 9216, 1, new ArrayList<ContainerId>());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode3));
        Assert.assertNotNull((Object)cs.getNode(rmNode3.getNodeID()).getReservedContainer());
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        Assert.assertNotNull((Object)cs.getNode(rmNode3.getNodeID()).getReservedContainer());
        Thread.sleep(1000L);
        editPolicy.editSchedule();
        Assert.assertNull((Object)cs.getNode(rmNode3.getNodeID()).getReservedContainer());
        Assert.assertNotNull((Object)cs.getNode(rmNode2.getNodeID()).getReservedContainer());
        Assert.assertEquals((Object)am2.getApplicationAttemptId(), (Object)cs.getNode(rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId());
        editPolicy.editSchedule();
        Assert.assertEquals((long)2L, (long)editPolicy.getToPreemptContainers().size());
        editPolicy.editSchedule();
        while (schedulerApp2.getLiveContainers().size() < 2) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
            Thread.sleep(200L);
        }
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied() throws Exception {
        int i;
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000L);
        this.conf.setQueueOrderingPolicy("root", "priority-utilization");
        this.conf.setQueuePriority("root.c", 1);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM[] mockNMs = new MockNM[10];
        for (int i2 = 0; i2 < 10; ++i2) {
            mockNMs[i2] = rm1.registerNode("h" + i2 + ":1234", 10240);
        }
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode[] rmNodes = new RMNode[10];
        for (int i3 = 0; i3 < 10; ++i3) {
            rmNodes[i3] = (RMNode)rm1.getRMContext().getRMNodes().get(mockNMs[i3].getNodeId());
        }
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "b");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]);
        am1.allocate("*", 1024, 8, new ArrayList<ContainerId>());
        for (int i4 = 1; i4 < 9; ++i4) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i4]));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)9L, (long)schedulerApp1.getLiveContainers().size());
        for (int i5 = 0; i5 < 9; ++i5) {
            this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i5].getNodeID()), am1.getApplicationAttemptId(), 1);
        }
        RMApp app2 = rm1.submitApp(10240, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]);
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)app2.getApplicationId(), (int)1));
        am2.allocate("*", 10240, 10, new ArrayList<ContainerId>());
        for (i = 1; i < 10; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i]));
        }
        for (i = 1; i < 9; ++i) {
            Assert.assertNotNull((String)("Should reserve on nm-" + i), (Object)cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
        }
        Thread.sleep(1000L);
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        this.checkNumberOfPreemptionCandidateFromApp(editPolicy, 6, am1.getApplicationAttemptId());
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
        for (int i6 = 1; i6 < 10; ++i6) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i6]));
        }
        this.waitNumberOfLiveContainersFromApp(schedulerApp2, 7);
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
        rm1.close();
    }

    @Test(timeout=600000L)
    public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() throws Exception {
        int i;
        int i2;
        int i3;
        this.conf.setFloat("yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round", 0.05f);
        this.conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
        this.conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000L);
        this.conf.setQueueOrderingPolicy("root", "priority-utilization");
        this.conf.setQueuePriority("root.a", 1);
        this.conf.setQueuePriority("root.b", 2);
        this.conf.setCapacity("root.a", 45.0f);
        this.conf.setCapacity("root.b", 45.0f);
        this.conf.setCapacity("root.c", 10.0f);
        MockRM rm1 = new MockRM((Configuration)this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM[] mockNMs = new MockNM[5];
        for (int i4 = 0; i4 < 5; ++i4) {
            mockNMs[i4] = rm1.registerNode("h" + i4 + ":1234", 4096);
        }
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode[] rmNodes = new RMNode[5];
        for (int i5 = 0; i5 < 5; ++i5) {
            rmNodes[i5] = (RMNode)rm1.getRMContext().getRMNodes().get(mockNMs[i5].getNodeId());
        }
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
        am1.allocate("*", 1024, 4, new ArrayList<ContainerId>());
        for (int i6 = 0; i6 < 4; ++i6) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i6]));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)5L, (long)schedulerApp1.getLiveContainers().size());
        for (int i7 = 0; i7 < 5; ++i7) {
            this.waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i7].getNodeID()), am1.getApplicationAttemptId(), 1);
        }
        RMApp app2 = rm1.submitApp(512, "app", "user", null, "a");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)app2.getApplicationId(), (int)1));
        am2.allocate("*", 3584, 2, new ArrayList<ContainerId>());
        for (i3 = 0; i3 < 2; ++i3) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i3]));
        }
        for (i3 = 0; i3 < 2; ++i3) {
            Assert.assertNotNull((String)("Should reserve on nm-" + i3), (Object)cs.getNode(rmNodes[i3].getNodeID()).getReservedContainer());
            Assert.assertEquals((Object)cs.getNode(rmNodes[i3].getNodeID()).getReservedContainer().getQueueName(), (Object)"a");
        }
        RMApp app3 = rm1.submitApp(512, "app", "user", null, "b");
        MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
        FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(ApplicationAttemptId.newInstance((ApplicationId)app3.getApplicationId(), (int)1));
        am3.allocate("*", 3584, 2, new ArrayList<ContainerId>());
        for (i2 = 2; i2 < 4; ++i2) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i2]));
        }
        for (i2 = 2; i2 < 4; ++i2) {
            Assert.assertNotNull((String)("Should reserve on nm-" + i2), (Object)cs.getNode(rmNodes[i2].getNodeID()).getReservedContainer());
            Assert.assertEquals((Object)cs.getNode(rmNodes[i2].getNodeID()).getReservedContainer().getQueueName(), (Object)"b");
        }
        Thread.sleep(1000L);
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        Set selectedToPreempt = editPolicy.getToPreemptContainers().keySet();
        Assert.assertEquals((long)1L, (long)selectedToPreempt.size());
        Assert.assertEquals((Object)mockNMs[2].getNodeId(), (Object)((RMContainer)selectedToPreempt.iterator().next()).getAllocatedNode());
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
        for (i = 0; i < 4; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i]));
        }
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
        this.waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
        this.waitNumberOfLiveContainersFromApp(schedulerApp3, 2);
        editPolicy.editSchedule();
        selectedToPreempt = editPolicy.getToPreemptContainers().keySet();
        Assert.assertEquals((long)1L, (long)selectedToPreempt.size());
        Assert.assertEquals((Object)mockNMs[3].getNodeId(), (Object)((RMContainer)selectedToPreempt.iterator().next()).getAllocatedNode());
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
        for (i = 0; i < 4; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i]));
        }
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
        this.waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
        this.waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
        editPolicy.editSchedule();
        selectedToPreempt = editPolicy.getToPreemptContainers().keySet();
        Assert.assertEquals((long)1L, (long)selectedToPreempt.size());
        Assert.assertEquals((Object)mockNMs[0].getNodeId(), (Object)((RMContainer)selectedToPreempt.iterator().next()).getAllocatedNode());
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
        for (i = 0; i < 4; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i]));
        }
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
        this.waitNumberOfLiveContainersFromApp(schedulerApp2, 2);
        this.waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
        editPolicy.editSchedule();
        selectedToPreempt = editPolicy.getToPreemptContainers().keySet();
        Assert.assertEquals((long)1L, (long)selectedToPreempt.size());
        Assert.assertEquals((Object)mockNMs[1].getNodeId(), (Object)((RMContainer)selectedToPreempt.iterator().next()).getAllocatedNode());
        editPolicy.editSchedule();
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
        for (i = 0; i < 4; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNodes[i]));
        }
        this.waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
        this.waitNumberOfLiveContainersFromApp(schedulerApp2, 3);
        this.waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testPreemptionForFragmentatedCluster() throws Exception {
        this.conf.setBoolean("yarn.resourcemanager.monitor.capacity.preemption.additional_res_balance_based_on_reserved_containers", true);
        CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration((Configuration)this.conf);
        conf.setLong("yarn.scheduler.maximum-allocation-mb", 21504L);
        conf.setQueues("root", new String[]{"a", "b"});
        conf.setCapacity("root.a", 50.0f);
        conf.setUserLimitFactor("root.a", 100.0f);
        conf.setCapacity("root.b", 50.0f);
        conf.setUserLimitFactor("root.b", 100.0f);
        MockRM rm1 = new MockRM((Configuration)conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        ArrayList<MockNM> nms = new ArrayList<MockNM>();
        for (int i = 0; i < 5; ++i) {
            nms.add(rm1.registerNode("h" + i + ":1234", 30720));
        }
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMApp app1 = rm1.submitApp(3072, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, (MockNM)nms.get(0));
        am1.allocate("*", 21504, 4, new ArrayList<ContainerId>());
        for (int i = 0; i < 10; ++i) {
            MockNM mockNM = (MockNM)nms.get(i % nms.size());
            RMNode rmNode = (RMNode)cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)5L, (long)schedulerApp1.getLiveContainers().size());
        RMApp app2 = rm1.submitApp(3072, "app", "user", null, "b");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, (MockNM)nms.get(2));
        am2.allocate("*", 21504, 4, new ArrayList<ContainerId>());
        for (int i = 0; i < 10; ++i) {
            MockNM mockNM = (MockNM)nms.get(i % nms.size());
            RMNode rmNode = (RMNode)cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode));
        }
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
        Assert.assertEquals((long)2L, (long)schedulerApp2.getLiveContainers().size());
        this.waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
        SchedulingEditPolicy editPolicy = this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        for (int tick = 0; schedulerApp2.getLiveContainers().size() != 4 && tick < 10; ++tick) {
            for (int i = 0; i < 10; ++i) {
                MockNM mockNM = (MockNM)nms.get(i % nms.size());
                RMNode rmNode = (RMNode)cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
                cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode));
            }
            Thread.sleep(100L);
        }
        Assert.assertEquals((long)3L, (long)schedulerApp2.getLiveContainers().size());
        rm1.close();
    }
}

