/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMHATestBase;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestRMHAForAsyncScheduler
extends RMHATestBase {
    @Override
    @Before
    public void setup() throws Exception {
        super.setup();
        this.confForRM1.setClass("yarn.scheduler.capacity.resource-calculator", DominantResourceCalculator.class, ResourceCalculator.class);
        this.confForRM1.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.confForRM1.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", true);
        this.confForRM2.setClass("yarn.scheduler.capacity.resource-calculator", DominantResourceCalculator.class, ResourceCalculator.class);
        this.confForRM2.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.confForRM2.setBoolean("yarn.scheduler.capacity.schedule-asynchronously.enable", true);
    }

    @Test(timeout=60000L)
    public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
        this.startRMs();
        rm1.registerNode("h1:1234", 8192, 8);
        RMApp app1 = this.submitAppAndCheckLaunched(rm1);
        this.explicitFailover();
        this.checkAsyncSchedulerThreads(Thread.currentThread());
        rm2.registerNode("h1:1234", 8192, 8);
        rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.LAUNCHED);
        rm2.killApp(app1.getApplicationId());
        RMApp app2 = this.submitAppAndCheckLaunched(rm2);
        HAServiceProtocol.StateChangeRequestInfo requestInfo = new HAServiceProtocol.StateChangeRequestInfo(HAServiceProtocol.RequestSource.REQUEST_BY_USER);
        TestRMHAForAsyncScheduler.rm2.adminService.transitionToStandby(requestInfo);
        TestRMHAForAsyncScheduler.rm1.adminService.transitionToActive(requestInfo);
        Assert.assertTrue((rm2.getRMContext().getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY ? 1 : 0) != 0);
        Assert.assertTrue((rm1.getRMContext().getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE ? 1 : 0) != 0);
        this.checkAsyncSchedulerThreads(Thread.currentThread());
        rm1.registerNode("h1:1234", 8192, 8);
        rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.LAUNCHED);
        rm1.killApp(app2.getApplicationId());
        this.submitAppAndCheckLaunched(rm1);
        rm1.stop();
        rm2.stop();
    }

    private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
        RMApp app = rm.submitApp(200, "", UserGroupInformation.getCurrentUser().getShortUserName(), null, false, "default", this.configuration.getInt("yarn.resourcemanager.am.max-attempts", 2), null, null, false, false);
        rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        rm.sendAMLaunched(attempt.getAppAttemptId());
        rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.LAUNCHED);
        return app;
    }

    private void checkAsyncSchedulerThreads(Thread currentThread) {
        ThreadGroup threadGroup = currentThread.getThreadGroup();
        while (threadGroup.getParent() != null) {
            threadGroup = threadGroup.getParent();
        }
        Thread[] threads = new Thread[threadGroup.activeCount()];
        threadGroup.enumerate(threads);
        int numAsyncScheduleThread = 0;
        int numResourceCommitterService = 0;
        Thread asyncScheduleThread = null;
        Thread resourceCommitterService = null;
        for (Thread thread : threads) {
            StackTraceElement[] stackTrace = thread.getStackTrace();
            if (stackTrace.length <= 0) continue;
            String stackBottom = stackTrace[stackTrace.length - 1].toString();
            if (stackBottom.contains("AsyncScheduleThread.run")) {
                ++numAsyncScheduleThread;
                asyncScheduleThread = thread;
                continue;
            }
            if (!stackBottom.contains("ResourceCommitterService.run")) continue;
            ++numResourceCommitterService;
            resourceCommitterService = thread;
        }
        Assert.assertEquals((long)1L, (long)numResourceCommitterService);
        Assert.assertEquals((long)1L, (long)numAsyncScheduleThread);
        Assert.assertNotNull(asyncScheduleThread);
        Assert.assertNotNull(resourceCommitterService);
    }
}

