/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestCapacitySchedulerAsyncScheduling {
    private final int GB = 1024;
    private YarnConfiguration conf;
    RMNodeLabelsManager mgr;

    @Before
    public void setUp() throws Exception {
        this.conf = new YarnConfiguration();
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.conf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", true);
        this.mgr = new NullRMNodeLabelsManager();
        this.mgr.init((Configuration)this.conf);
    }

    @Test(timeout=300000L)
    public void testSingleThreadAsyncContainerAllocation() throws Exception {
        this.testAsyncContainerAllocation(1);
    }

    @Test(timeout=300000L)
    public void testTwoThreadsAsyncContainerAllocation() throws Exception {
        this.testAsyncContainerAllocation(2);
    }

    @Test(timeout=300000L)
    public void testThreeThreadsAsyncContainerAllocation() throws Exception {
        this.testAsyncContainerAllocation(3);
    }

    public void testAsyncContainerAllocation(int numThreads) throws Exception {
        int waitTime;
        int i;
        this.conf.setInt("yarn.scheduler.capacity.schedule-asynchronously.maximum-threads", numThreads);
        this.conf.setInt("yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms", 100);
        final NullRMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
        mgr.init((Configuration)this.conf);
        MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues((Configuration)this.conf)){

            @Override
            public RMNodeLabelsManager createNodeLabelManager() {
                return mgr;
            }
        };
        rm.getRMContext().setNodeLabelManager((RMNodeLabelsManager)mgr);
        rm.start();
        ArrayList<MockNM> nms = new ArrayList<MockNM>();
        for (int i2 = 0; i2 < 10; ++i2) {
            nms.add(rm.registerNode("h-" + i2 + ":1234", 20480));
        }
        ArrayList<MockAM> ams = new ArrayList<MockAM>();
        int totalAsked = 3072;
        for (i = 0; i < 3; ++i) {
            RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, Character.toString((char)(i % 34 + 97)), 1, null, null, false);
            MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
            am.registerAppAttempt();
            ams.add(am);
        }
        for (i = 0; i < 3; ++i) {
            ((MockAM)ams.get(i)).allocate("*", 1024, 20 * (i + 1), new ArrayList<ContainerId>());
            totalAsked += 20 * (i + 1) * 1024;
        }
        for (waitTime = 15000; waitTime > 0 && rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB() != (long)totalAsked; waitTime -= 50) {
            Thread.sleep(50L);
        }
        Assert.assertEquals((long)rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(), (long)totalAsked);
        for (waitTime = 2000; waitTime > 0; waitTime -= 50) {
            Assert.assertEquals((long)rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(), (long)totalAsked);
            Thread.sleep(50L);
        }
        rm.close();
    }

    @Test(timeout=30000L)
    public void testCommitProposalForFailedAppAttempt() throws Exception {
        Configuration disableAsyncConf = new Configuration((Configuration)this.conf);
        disableAsyncConf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", false);
        MockRM rm = new MockRM(disableAsyncConf);
        rm.start();
        MockNM nm1 = rm.registerNode("h1:1234", 9216);
        MockNM nm2 = rm.registerNode("h2:2234", 9216);
        ArrayList<MockNM> nmLst = new ArrayList<MockNM>();
        nmLst.add(nm1);
        nmLst.add(nm2);
        while (((CapacityScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount() < 2) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)2L, (long)((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount());
        CapacityScheduler scheduler = (CapacityScheduler)rm.getRMContext().getScheduler();
        SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
        SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId());
        RMApp app = rm.submitApp(200, "app", "user", null, false, "default", 2, null, null, true, true);
        MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
        FiCaSchedulerApp schedulerApp = scheduler.getApplicationAttempt(am.getApplicationAttemptId());
        this.allocateAndLaunchContainers(am, nm2, rm, 1, Resources.createResource((int)5120), 0, 2);
        Assert.assertEquals((long)1L, (long)sn1.getNumContainers());
        Assert.assertEquals((long)1L, (long)sn2.getNumContainers());
        scheduler.handle((SchedulerEvent)new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(), RMAppAttemptState.KILLED, true));
        while (sn1.getCopiedListOfRunningContainers().size() == 1) {
            Thread.sleep(100L);
        }
        while (sn1.getCopiedListOfRunningContainers().size() == 0) {
            nm1.nodeHeartbeat(true);
            Thread.sleep(100L);
        }
        Resource reservedResource = Resources.createResource((int)5120);
        Container container = Container.newInstance((ContainerId)ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)3L), (NodeId)sn2.getNodeID(), (String)sn2.getHttpAddress(), (Resource)reservedResource, (Priority)Priority.newInstance((int)0), null);
        RMContainerImpl rmContainer = new RMContainerImpl(container, SchedulerRequestKey.create((ResourceRequest)ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)reservedResource, (int)1)), am.getApplicationAttemptId(), sn2.getNodeID(), "user", rm.getRMContext());
        SchedulerContainer reservedContainer = new SchedulerContainer((SchedulerApplicationAttempt)schedulerApp, (SchedulerNode)scheduler.getNode(sn2.getNodeID()), (RMContainer)rmContainer, "", false);
        ContainerAllocationProposal reservedForAttempt1Proposal = new ContainerAllocationProposal(reservedContainer, null, reservedContainer, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, reservedResource);
        ArrayList<ContainerAllocationProposal> reservedProposals = new ArrayList<ContainerAllocationProposal>();
        reservedProposals.add(reservedForAttempt1Proposal);
        ResourceCommitRequest request = new ResourceCommitRequest(null, reservedProposals, null);
        scheduler.tryCommit(scheduler.getClusterResource(), request);
        Assert.assertNull((String)"Outdated proposal should not be accepted!", (Object)sn2.getReservedContainer());
        rm.stop();
    }

    @Test(timeout=30000L)
    public void testCommitOutdatedReservedProposal() throws Exception {
        int waitTime;
        Configuration disableAsyncConf = new Configuration((Configuration)this.conf);
        disableAsyncConf.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", false);
        MockRM rm = new MockRM(disableAsyncConf);
        rm.start();
        MockNM nm1 = rm.registerNode("h1:1234", 9216);
        MockNM nm2 = rm.registerNode("h2:2234", 9216);
        for (waitTime = 1000; waitTime > 0 && ((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount() < 2; waitTime -= 10) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)2L, (long)((AbstractYarnScheduler)rm.getRMContext().getScheduler()).getNodeTracker().nodeCount());
        ResourceScheduler scheduler = rm.getRMContext().getScheduler();
        final SchedulerNode sn1 = ((CapacityScheduler)scheduler).getSchedulerNode(nm1.getNodeId());
        final SchedulerNode sn2 = ((CapacityScheduler)scheduler).getSchedulerNode(nm2.getNodeId());
        RMApp app = rm.submitApp(200, "app", "user", null, "default");
        MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
        RMApp app2 = rm.submitApp(200, "app", "user", null, "default");
        final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
        this.allocateAndLaunchContainers(am, nm1, rm, 1, Resources.createResource((int)5120), 0, 2);
        this.allocateAndLaunchContainers(am, nm2, rm, 1, Resources.createResource((int)5120), 0, 3);
        Assert.assertEquals((long)3L, (long)sn1.getNumContainers());
        Assert.assertEquals((long)1L, (long)sn2.getNumContainers());
        ResourceRequest rr2 = ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)Resources.createResource((int)5120), (int)1);
        am.allocate(Arrays.asList(rr2), null);
        nm1.nodeHeartbeat(true);
        for (waitTime = 1000; waitTime > 0 && sn1.getReservedContainer() == null; waitTime -= 10) {
            Thread.sleep(10L);
        }
        Assert.assertNotNull((Object)sn1.getReservedContainer());
        final CapacityScheduler cs = (CapacityScheduler)scheduler;
        CapacityScheduler spyCs = (CapacityScheduler)Mockito.spy((Object)cs);
        final AtomicBoolean isFirstReserve = new AtomicBoolean(true);
        final AtomicBoolean isChecked = new AtomicBoolean(false);
        ((CapacityScheduler)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                ResourceCommitRequest request = (ResourceCommitRequest)invocation.getArguments()[1];
                if (request.getContainersToReserve().size() > 0 && isFirstReserve.compareAndSet(true, false)) {
                    int waitTime;
                    RMContainer killableContainer = (RMContainer)sn2.getCopiedListOfRunningContainers().get(0);
                    cs.completedContainer(killableContainer, ContainerStatus.newInstance((ContainerId)killableContainer.getContainerId(), (ContainerState)ContainerState.COMPLETE, (String)"", (int)-106), RMContainerEventType.KILL);
                    Assert.assertEquals((long)0L, (long)sn2.getCopiedListOfRunningContainers().size());
                    cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn2.getRMNode()));
                    for (waitTime = 1000; waitTime > 0 && sn2.getCopiedListOfRunningContainers().size() == 0; waitTime -= 10) {
                        Thread.sleep(10L);
                    }
                    Assert.assertEquals((long)1L, (long)sn2.getCopiedListOfRunningContainers().size());
                    Assert.assertNull((Object)sn1.getReservedContainer());
                    ResourceRequest rr3 = ResourceRequest.newInstance((Priority)Priority.newInstance((int)0), (String)"*", (Resource)Resources.createResource((int)5120), (int)1);
                    am2.allocate(Arrays.asList(rr3), null);
                    cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn1.getRMNode()));
                    for (waitTime = 1000; waitTime > 0 && sn1.getReservedContainer() == null; waitTime -= 10) {
                        Thread.sleep(10L);
                    }
                    Assert.assertNotNull((Object)sn1.getReservedContainer());
                    try {
                        cs.tryCommit((Resource)invocation.getArguments()[0], (ResourceCommitRequest)invocation.getArguments()[1]);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail();
                    }
                    isChecked.set(true);
                } else {
                    cs.tryCommit((Resource)invocation.getArguments()[0], (ResourceCommitRequest)invocation.getArguments()[1]);
                }
                return null;
            }
        }).when((Object)spyCs)).tryCommit((Resource)Mockito.any(Resource.class), (ResourceCommitRequest)Mockito.any(ResourceCommitRequest.class));
        spyCs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(sn1.getRMNode()));
        for (waitTime = 1000; waitTime > 0 && !isChecked.get(); waitTime -= 10) {
            Thread.sleep(10L);
        }
        rm.stop();
    }

    private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority2, int startContainerId) throws Exception {
        am.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)priority2), (String)"*", (Resource)resource, (int)nContainer)), null);
        ContainerId lastContainerId = ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)(startContainerId + nContainer - 1));
        Assert.assertTrue((boolean)rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED));
        am.allocate(null, null);
        CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
        for (int cId = startContainerId; cId < startContainerId + nContainer; ++cId) {
            ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)cId);
            RMContainer rmContainer = cs.getRMContainer(containerId);
            if (rmContainer != null) {
                rmContainer.handle((Event)new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
            } else {
                Assert.fail((String)"Cannot find RMContainer");
            }
            rm.waitForState(nm, ContainerId.newContainerId((ApplicationAttemptId)am.getApplicationAttemptId(), (long)cId), RMContainerState.RUNNING);
        }
    }
}

