package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.submarine.client.cli.param.Localization;
import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.submarine.common.Envs;
import org.apache.hadoop.yarn.submarine.common.api.TaskType;
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.class */
public class YarnServiceJobSubmitter implements JobSubmitter {
    public static final String TENSORBOARD_QUICKLINK_LABEL = "Tensorboard";
    private static final Logger LOG = LoggerFactory.getLogger(YarnServiceJobSubmitter.class);
    ClientContext clientContext;
    Service serviceSpec;
    private Set<Path> uploadedFiles = new HashSet();
    private Map<String, String> componentToLocalLaunchScriptPath = new HashMap();

    public YarnServiceJobSubmitter(ClientContext clientContext) {
        this.clientContext = clientContext;
    }

    private Resource getServiceResourceFromYarnResource(org.apache.hadoop.yarn.api.records.Resource resource) {
        Resource resource2 = new Resource();
        resource2.setCpus(Integer.valueOf(resource.getVirtualCores()));
        resource2.setMemory(String.valueOf(resource.getMemorySize()));
        HashMap hashMap = new HashMap();
        for (ResourceInformation resourceInformation : resource.getAllResourcesListCopy()) {
            org.apache.hadoop.yarn.service.api.records.ResourceInformation resourceInformation2 = new org.apache.hadoop.yarn.service.api.records.ResourceInformation();
            resourceInformation2.setValue(Long.valueOf(resourceInformation.getValue()));
            resourceInformation2.setUnit(resourceInformation.getUnits());
            hashMap.put(resourceInformation.getName(), resourceInformation2);
        }
        resource2.setResourceInformations(hashMap);
        return resource2;
    }

    private String getValueOfEnvionment(String str) {
        return (str == null || !str.contains("=")) ? "" : str.substring(str.indexOf("=") + 1);
    }

    private boolean needHdfs(String str) {
        return str != null && str.contains("hdfs://");
    }

    private void addHdfsClassPathIfNeeded(RunJobParameters runJobParameters, PrintWriter printWriter, Component component) throws IOException {
        String str = null;
        String str2 = null;
        boolean z = false;
        for (String str3 : runJobParameters.getEnvars()) {
            if (str3.startsWith("DOCKER_HADOOP_HDFS_HOME=")) {
                str = getValueOfEnvionment(str3);
                z = true;
            } else if (str3.startsWith("DOCKER_JAVA_HOME=")) {
                str2 = getValueOfEnvionment(str3);
            }
        }
        boolean z2 = false;
        if (needHdfs(runJobParameters.getInputPath()) || needHdfs(runJobParameters.getPSLaunchCmd()) || needHdfs(runJobParameters.getWorkerLaunchCmd()) || z) {
            if (str != null) {
                printWriter.append("export HADOOP_HOME=\n");
                printWriter.append("export HADOOP_YARN_HOME=\n");
                printWriter.append((CharSequence) ("export HADOOP_HDFS_HOME=" + str + "\n"));
                printWriter.append((CharSequence) ("export HADOOP_COMMON_HOME=" + str + "\n"));
            } else {
                z2 = true;
            }
            printWriter.append("export HADOOP_CONF_DIR=$WORK_DIR\n");
            if (str2 != null) {
                printWriter.append((CharSequence) ("export JAVA_HOME=" + str2 + "\n"));
                printWriter.append("export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/lib/amd64/server\n");
            } else {
                z2 = true;
            }
            printWriter.append("export CLASSPATH=`$HADOOP_HDFS_HOME/bin/hadoop classpath --glob`\n");
        }
        if (z2) {
            LOG.error("When hdfs is being used to read/write models/data. Followingenvs are required: 1) DOCKER_HADOOP_HDFS_HOME=<HDFS_HOME insidedocker container> 2) DOCKER_JAVA_HOME=<JAVA_HOME inside dockercontainer>. You can use --env to pass these envars.");
            throw new IOException("Failed to detect HDFS-related environments.");
        }
        Path jobStagingArea = this.clientContext.getRemoteDirectoryManager().getJobStagingArea(runJobParameters.getName(), true);
        File findFileOnClassPath = findFileOnClassPath("core-site.xml");
        File findFileOnClassPath2 = findFileOnClassPath("hdfs-site.xml");
        if (findFileOnClassPath == null || findFileOnClassPath2 == null) {
            LOG.error("hdfs is being used, however we couldn't locate core-site.xml/hdfs-site.xml from classpath, please double check you classpathsetting and make sure they're included.");
            throw new IOException("Failed to locate core-site.xml / hdfs-site.xml from class path");
        }
        uploadToRemoteFileAndLocalizeToContainerWorkDir(jobStagingArea, findFileOnClassPath.getAbsolutePath(), "core-site.xml", component);
        uploadToRemoteFileAndLocalizeToContainerWorkDir(jobStagingArea, findFileOnClassPath2.getAbsolutePath(), "hdfs-site.xml", component);
        if (SubmarineLogs.isVerbose()) {
            printWriter.append("echo \"CLASSPATH:$CLASSPATH\"\n");
            printWriter.append("echo \"HADOOP_CONF_DIR:$HADOOP_CONF_DIR\"\n");
            printWriter.append("echo \"HADOOP_TOKEN_FILE_LOCATION:$HADOOP_TOKEN_FILE_LOCATION\"\n");
            printWriter.append("echo \"JAVA_HOME:$JAVA_HOME\"\n");
            printWriter.append("echo \"LD_LIBRARY_PATH:$LD_LIBRARY_PATH\"\n");
            printWriter.append("echo \"HADOOP_HDFS_HOME:$HADOOP_HDFS_HOME\"\n");
        }
    }

    private void addCommonEnvironments(Component component, TaskType taskType) {
        Map env = component.getConfiguration().getEnv();
        env.put(Envs.TASK_INDEX_ENV, ServiceApiConstants.COMPONENT_ID);
        env.put(Envs.TASK_TYPE_ENV, taskType.name());
    }

    @VisibleForTesting
    protected String getUserName() {
        return System.getProperty("user.name");
    }

    private String getDNSDomain() {
        return this.clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name");
    }

    private String generateCommandLaunchScript(RunJobParameters runJobParameters, TaskType taskType, Component component) throws IOException {
        File createTempFile = File.createTempFile(taskType.name() + "-launch-script", ".sh");
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(createTempFile), "UTF-8"));
        try {
            printWriter.append("#!/bin/bash\n");
            addHdfsClassPathIfNeeded(runJobParameters, printWriter, component);
            if (taskType.equals(TaskType.TENSORBOARD)) {
                String str = "export LC_ALL=C && tensorboard --logdir=" + runJobParameters.getCheckpointPath();
                printWriter.append((CharSequence) (str + "\n"));
                LOG.info("Tensorboard command=" + str);
            } else {
                if (runJobParameters.isDistributed()) {
                    printWriter.append((CharSequence) ("export TF_CONFIG=\"" + YarnServiceUtils.getTFConfigEnv(taskType.getComponentName(), runJobParameters.getNumWorkers(), runJobParameters.getNumPS(), runJobParameters.getName(), getUserName(), getDNSDomain()) + "\"\n"));
                }
                if (taskType.equals(TaskType.WORKER) || taskType.equals(TaskType.PRIMARY_WORKER)) {
                    printWriter.append((CharSequence) (runJobParameters.getWorkerLaunchCmd() + '\n'));
                    if (SubmarineLogs.isVerbose()) {
                        LOG.info("Worker command =[" + runJobParameters.getWorkerLaunchCmd() + "]");
                    }
                } else if (taskType.equals(TaskType.PS)) {
                    printWriter.append((CharSequence) (runJobParameters.getPSLaunchCmd() + '\n'));
                    if (SubmarineLogs.isVerbose()) {
                        LOG.info("PS command =[" + runJobParameters.getPSLaunchCmd() + "]");
                    }
                }
            }
            return createTempFile.getAbsolutePath();
        } finally {
            printWriter.close();
        }
    }

    private String getScriptFileName(TaskType taskType) {
        return "run-" + taskType.name() + ".sh";
    }

    private File findFileOnClassPath(String str) {
        StringTokenizer stringTokenizer = new StringTokenizer(System.getProperty("java.class.path"), System.getProperty("path.separator"));
        while (stringTokenizer.hasMoreTokens()) {
            File file = new File(stringTokenizer.nextToken());
            File absoluteFile = file.getAbsoluteFile();
            if (absoluteFile.isFile()) {
                File file2 = new File(absoluteFile.getParent(), str);
                if (file2.exists()) {
                    return file2;
                }
            } else {
                File file3 = new File(file, str);
                if (file3.exists()) {
                    return file3;
                }
            }
        }
        return null;
    }

    private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path path, String str, String str2, Component component) throws IOException {
        locateRemoteFileToContainerWorkDir(str2, component, uploadToRemoteFile(path, str));
    }

    private void locateRemoteFileToContainerWorkDir(String str, Component component, Path path) throws IOException {
        FileStatus fileStatus = FileSystem.get(this.clientContext.getYarnConfig()).getFileStatus(path);
        LOG.info("Uploaded file path = " + fileStatus.getPath());
        component.getConfiguration().getFiles().add(new ConfigFile().srcFile(fileStatus.getPath().toUri().toString()).destFile(str).type(ConfigFile.TypeEnum.STATIC));
    }

    private Path uploadToRemoteFile(Path path, String str) throws IOException {
        FileSystem defaultFileSystem = this.clientContext.getRemoteDirectoryManager().getDefaultFileSystem();
        File file = new File(str);
        if (!file.exists()) {
            throw new FileNotFoundException("Trying to upload file=" + file.getAbsolutePath() + " to remote, but couldn't find local file.");
        }
        Path path2 = new Path(path, new File(str).getName());
        if (!this.uploadedFiles.contains(path2)) {
            if (SubmarineLogs.isVerbose()) {
                LOG.info("Copying local file=" + str + " to remote=" + path2);
            }
            defaultFileSystem.copyFromLocalFile(new Path(str), path2);
            this.uploadedFiles.add(path2);
        }
        return path2;
    }

    private void setPermission(Path path, FsPermission fsPermission) throws IOException {
        FileSystem.get(this.clientContext.getYarnConfig()).setPermission(path, new FsPermission(fsPermission));
    }

    private void handleLaunchCommand(RunJobParameters runJobParameters, TaskType taskType, Component component) throws IOException {
        Path jobStagingArea = this.clientContext.getRemoteDirectoryManager().getJobStagingArea(runJobParameters.getName(), true);
        String generateCommandLaunchScript = generateCommandLaunchScript(runJobParameters, taskType, component);
        String scriptFileName = getScriptFileName(taskType);
        uploadToRemoteFileAndLocalizeToContainerWorkDir(jobStagingArea, generateCommandLaunchScript, scriptFileName, component);
        component.setLaunchCommand("./" + scriptFileName);
        this.componentToLocalLaunchScriptPath.put(taskType.getComponentName(), generateCommandLaunchScript);
    }

    private String getLastNameFromPath(String str) {
        return new Path(str).getName();
    }

    private String mayDownloadAndZipIt(String str, String str2, boolean z) throws IOException {
        String str3;
        RemoteDirectoryManager remoteDirectoryManager = this.clientContext.getRemoteDirectoryManager();
        String str4 = str;
        String str5 = System.getProperty("java.io.tmpdir") + "/" + str2;
        boolean z2 = false;
        if (remoteDirectoryManager.isRemote(str)) {
            str3 = "_" + remoteDirectoryManager.getRemoteFileStatus(new Path(str)).getModificationTime() + "-" + remoteDirectoryManager.getRemoteFileSize(str);
            if (!remoteDirectoryManager.copyRemoteToLocal(str, str5)) {
                throw new IOException("Failed to download files from " + str);
            }
            LOG.info("Downloaded remote: {} to local: {}", str, str5);
            str4 = str5;
            z2 = true;
        } else {
            File file = new File(str);
            str3 = "_" + file.lastModified() + "-" + file.length();
        }
        if (!z) {
            return str4;
        }
        String zipDir = zipDir(str4, str5 + str3 + ".zip");
        if (z2) {
            deleteFiles(str4);
        }
        return zipDir;
    }

    @VisibleForTesting
    public String zipDir(String str, String str2) throws IOException {
        ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(str2));
        File file = new File(str);
        LOG.info("Compressing {}", str);
        addDirToZip(zipOutputStream, file, file);
        zipOutputStream.close();
        LOG.info("Compressed {} to {}", str, str2);
        return str2;
    }

    private void deleteFiles(String str) {
        if (!FileUtil.fullyDelete(new File(str))) {
            LOG.warn("Fail to delete {}", str);
        }
        LOG.info("Deleted {}", str);
    }

    private void addDirToZip(ZipOutputStream zipOutputStream, File file, File file2) throws IOException {
        File[] listFiles = file.listFiles();
        if (null == listFiles) {
            return;
        }
        FileInputStream fileInputStream = null;
        for (int i = 0; i < listFiles.length; i++) {
            if (listFiles[i].isDirectory()) {
                addDirToZip(zipOutputStream, listFiles[i], file2);
            } else {
                byte[] bArr = new byte[1024];
                try {
                    fileInputStream = new FileInputStream(listFiles[i]);
                    String path = file2.toURI().relativize(listFiles[i].toURI()).getPath();
                    LOG.info(" Zip adding: " + path);
                    zipOutputStream.putNextEntry(new ZipEntry(path));
                    while (true) {
                        int read = fileInputStream.read(bArr);
                        if (read <= 0) {
                            break;
                        } else {
                            zipOutputStream.write(bArr, 0, read);
                        }
                    }
                    zipOutputStream.flush();
                    if (fileInputStream != null) {
                        fileInputStream.close();
                    }
                    zipOutputStream.closeEntry();
                } catch (Throwable th) {
                    if (fileInputStream != null) {
                        fileInputStream.close();
                    }
                    zipOutputStream.closeEntry();
                    throw th;
                }
            }
        }
    }

    private void addWorkerComponent(Service service, RunJobParameters runJobParameters, TaskType taskType) throws IOException {
        Component component = new Component();
        addCommonEnvironments(component, taskType);
        component.setName(taskType.getComponentName());
        if (taskType.equals(TaskType.PRIMARY_WORKER)) {
            component.setNumberOfContainers(1L);
        } else {
            component.setNumberOfContainers(Long.valueOf(runJobParameters.getNumWorkers() - 1));
        }
        if (runJobParameters.getWorkerDockerImage() != null) {
            component.setArtifact(getDockerArtifact(runJobParameters.getWorkerDockerImage()));
        }
        component.setResource(getServiceResourceFromYarnResource(runJobParameters.getWorkerResource()));
        handleLaunchCommand(runJobParameters, taskType, component);
        component.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
        service.addComponent(component);
    }

    private void addWorkerComponents(Service service, RunJobParameters runJobParameters) throws IOException {
        addWorkerComponent(service, runJobParameters, TaskType.PRIMARY_WORKER);
        if (runJobParameters.getNumWorkers() > 1) {
            addWorkerComponent(service, runJobParameters, TaskType.WORKER);
        }
    }

    private void appendToEnv(Service service, String str, String str2, String str3) {
        Map env = service.getConfiguration().getEnv();
        if (!env.containsKey(str)) {
            env.put(str, str2);
            return;
        }
        if (str2.isEmpty()) {
            return;
        }
        String str4 = (String) env.get(str);
        if (str4.endsWith(str3)) {
            env.put(str, str4 + str2);
        } else {
            env.put(str, str4 + str3 + str2);
        }
    }

    private void handleServiceEnvs(Service service, RunJobParameters runJobParameters) {
        String str;
        String str2;
        if (runJobParameters.getEnvars() != null) {
            for (String str3 : runJobParameters.getEnvars()) {
                if (str3.contains("=")) {
                    int indexOf = str3.indexOf(61);
                    str = str3.substring(0, indexOf);
                    str2 = str3.substring(indexOf + 1);
                } else {
                    str = str3;
                    str2 = "";
                }
                appendToEnv(service, str, str2, ":");
            }
        }
        appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS", "/etc/passwd:/etc/passwd:ro", ",");
        String str4 = this.clientContext.getYarnConfig().get("hadoop.security.authentication");
        if (str4 == null || !str4.equals("kerberos")) {
            return;
        }
        appendToEnv(service, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS", "/etc/krb5.conf:/etc/krb5.conf:ro", ",");
    }

    private Artifact getDockerArtifact(String str) {
        return new Artifact().type(Artifact.TypeEnum.DOCKER).id(str);
    }

    private void handleQuicklinks(RunJobParameters runJobParameters) throws IOException {
        List<Quicklink> quicklinks = runJobParameters.getQuicklinks();
        if (null == quicklinks || quicklinks.isEmpty()) {
            return;
        }
        for (Quicklink quicklink : quicklinks) {
            String componentInstanceName = quicklink.getComponentInstanceName();
            boolean z = false;
            for (Component component : this.serviceSpec.getComponents()) {
                int i = 0;
                while (true) {
                    if (i >= component.getNumberOfContainers().longValue()) {
                        break;
                    }
                    if ((component.getName() + "-" + i).equals(componentInstanceName)) {
                        z = true;
                        break;
                    }
                    i++;
                }
            }
            if (!z) {
                throw new IOException("Couldn't find a component instance = " + componentInstanceName + " while adding quicklink");
            }
            YarnServiceUtils.addQuicklink(this.serviceSpec, quicklink.getLabel(), quicklink.getProtocol() + YarnServiceUtils.getDNSName(this.serviceSpec.getName(), componentInstanceName, getUserName(), getDNSDomain(), quicklink.getPort()));
        }
    }

    private Service createServiceByParameters(RunJobParameters runJobParameters) throws IOException {
        this.componentToLocalLaunchScriptPath.clear();
        this.serviceSpec = new Service();
        this.serviceSpec.setName(runJobParameters.getName());
        this.serviceSpec.setVersion(String.valueOf(System.currentTimeMillis()));
        this.serviceSpec.setArtifact(getDockerArtifact(runJobParameters.getDockerImageName()));
        handleKerberosPrincipal(runJobParameters);
        handleServiceEnvs(this.serviceSpec, runJobParameters);
        handleLocalizations(runJobParameters);
        if (runJobParameters.getNumWorkers() > 0) {
            addWorkerComponents(this.serviceSpec, runJobParameters);
        }
        if (runJobParameters.getNumPS() > 0) {
            Component component = new Component();
            component.setName(TaskType.PS.getComponentName());
            addCommonEnvironments(component, TaskType.PS);
            component.setNumberOfContainers(Long.valueOf(runJobParameters.getNumPS()));
            component.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
            component.setResource(getServiceResourceFromYarnResource(runJobParameters.getPsResource()));
            if (runJobParameters.getPsDockerImage() != null) {
                component.setArtifact(getDockerArtifact(runJobParameters.getPsDockerImage()));
            }
            handleLaunchCommand(runJobParameters, TaskType.PS, component);
            this.serviceSpec.addComponent(component);
        }
        if (runJobParameters.isTensorboardEnabled()) {
            Component component2 = new Component();
            component2.setName(TaskType.TENSORBOARD.getComponentName());
            addCommonEnvironments(component2, TaskType.TENSORBOARD);
            component2.setNumberOfContainers(1L);
            component2.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
            component2.setResource(getServiceResourceFromYarnResource(runJobParameters.getTensorboardResource()));
            if (runJobParameters.getTensorboardDockerImage() != null) {
                component2.setArtifact(getDockerArtifact(runJobParameters.getTensorboardDockerImage()));
            }
            handleLaunchCommand(runJobParameters, TaskType.TENSORBOARD, component2);
            String str = "http://" + YarnServiceUtils.getDNSName(runJobParameters.getName(), TaskType.TENSORBOARD.getComponentName() + "-0", getUserName(), getDNSDomain(), 6006);
            LOG.info("Link to tensorboard:" + str);
            this.serviceSpec.addComponent(component2);
            YarnServiceUtils.addQuicklink(this.serviceSpec, TENSORBOARD_QUICKLINK_LABEL, str);
        }
        handleQuicklinks(runJobParameters);
        return this.serviceSpec;
    }

    private void handleLocalizations(RunJobParameters runJobParameters) throws IOException {
        Path jobStagingArea = this.clientContext.getRemoteDirectoryManager().getJobStagingArea(runJobParameters.getName(), true);
        List<Localization> localizations = runJobParameters.getLocalizations();
        RemoteDirectoryManager remoteDirectoryManager = this.clientContext.getRemoteDirectoryManager();
        Iterator<Localization> it = localizations.iterator();
        while (it.hasNext()) {
            String remoteUri = it.next().getRemoteUri();
            Path path = new Path(remoteUri);
            if (remoteDirectoryManager.isRemote(remoteUri)) {
                if (!remoteDirectoryManager.existsRemoteFile(path)) {
                    throw new FileNotFoundException("File " + remoteUri + " doesn't exists.");
                }
            } else if (!new File(remoteUri).exists()) {
                throw new FileNotFoundException("File " + remoteUri + " doesn't exists.");
            }
            validFileSize(remoteUri);
        }
        for (Localization localization : localizations) {
            String remoteUri2 = localization.getRemoteUri();
            String localPath = localization.getLocalPath();
            String str = remoteUri2;
            ConfigFile.TypeEnum typeEnum = ConfigFile.TypeEnum.STATIC;
            Path path2 = new Path(remoteUri2);
            boolean z = true;
            boolean z2 = false;
            if (remoteDirectoryManager.isDir(remoteUri2)) {
                typeEnum = ConfigFile.TypeEnum.ARCHIVE;
                str = mayDownloadAndZipIt(remoteUri2, getLastNameFromPath(str), true);
            } else if (remoteDirectoryManager.isRemote(remoteUri2)) {
                if (needHdfs(remoteUri2)) {
                    z = false;
                } else {
                    str = mayDownloadAndZipIt(remoteUri2, getLastNameFromPath(str), false);
                    z2 = true;
                }
            }
            if (z) {
                path2 = uploadToRemoteFile(jobStagingArea, str);
            }
            if (z2) {
                deleteFiles(str);
            }
            if (typeEnum == ConfigFile.TypeEnum.ARCHIVE && str.endsWith(".zip")) {
                deleteFiles(str);
                str = str.substring(0, str.lastIndexOf(95));
            }
            if (!localPath.equals(".") && !localPath.equals("./")) {
                str = getLastNameFromPath(localPath);
            }
            String lastNameFromPath = getLastNameFromPath(str);
            LOG.info("The file/dir to be localized is {}", path2.toString());
            LOG.info("Its localized file name will be {}", lastNameFromPath);
            this.serviceSpec.getConfiguration().getFiles().add(new ConfigFile().srcFile(path2.toUri().toString()).destFile(lastNameFromPath).type(typeEnum));
            if (localPath.startsWith("/")) {
                String str2 = getLastNameFromPath(str) + ":" + localPath + ":" + localization.getMountPermission();
                LOG.info("Add bind-mount string {}", str2);
                appendToEnv(this.serviceSpec, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS", str2, ",");
            }
        }
    }

    private void validFileSize(String str) throws IOException {
        long du;
        Object obj = "Local";
        if (this.clientContext.getRemoteDirectoryManager().isRemote(str)) {
            du = this.clientContext.getRemoteDirectoryManager().getRemoteFileSize(str);
            obj = "Remote";
        } else {
            du = FileUtil.getDU(new File(str));
        }
        long j = this.clientContext.getSubmarineConfig().getLong(SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB, SubmarineConfiguration.DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB);
        LOG.info("{} fie/dir: {}, size(Byte):{}, Allowed max file/dir size: {}", new Object[]{obj, str, Long.valueOf(du), Long.valueOf(j * 1024 * 1024)});
        if (du > j * 1024 * 1024) {
            throw new IOException(str + " size(Byte): " + du + " exceeds configured max size:" + (j * 1024 * 1024));
        }
    }

    private String generateServiceSpecFile(Service service) throws IOException {
        File createTempFile = File.createTempFile(service.getName(), ".json");
        String json = ServiceApiUtil.jsonSerDeser.toJson(service);
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(createTempFile), "UTF-8"));
        try {
            printWriter.append((CharSequence) json);
            printWriter.close();
            return createTempFile.getAbsolutePath();
        } catch (Throwable th) {
            printWriter.close();
            throw th;
        }
    }

    private void handleKerberosPrincipal(RunJobParameters runJobParameters) throws IOException {
        if (StringUtils.isNotBlank(runJobParameters.getKeytab()) && StringUtils.isNotBlank(runJobParameters.getPrincipal())) {
            String keytab = runJobParameters.getKeytab();
            String principal = runJobParameters.getPrincipal();
            if (runJobParameters.isDistributeKeytab()) {
                Path uploadToRemoteFile = uploadToRemoteFile(this.clientContext.getRemoteDirectoryManager().getJobStagingArea(runJobParameters.getName(), true), keytab);
                setPermission(uploadToRemoteFile, FsPermission.createImmutable((short) Integer.parseInt("400", 8)));
                this.serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(uploadToRemoteFile.toString()).principalName(principal));
            } else {
                if (!keytab.startsWith("file")) {
                    keytab = "file://" + keytab;
                }
                this.serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab(keytab).principalName(principal));
            }
        }
    }

    @Override // org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter
    public ApplicationId submitJob(RunJobParameters runJobParameters) throws IOException, YarnException {
        createServiceByParameters(runJobParameters);
        String generateServiceSpecFile = generateServiceSpecFile(this.serviceSpec);
        AppAdminClient createServiceClient = YarnServiceUtils.createServiceClient(this.clientContext.getYarnConfig());
        int actionLaunch = createServiceClient.actionLaunch(generateServiceSpecFile, this.serviceSpec.getName(), (Long) null, (String) null);
        if (actionLaunch != 0) {
            throw new YarnException("Fail to launch application with exit code:" + actionLaunch);
        }
        String statusString = createServiceClient.getStatusString(this.serviceSpec.getName());
        Service service = (Service) ServiceApiUtil.jsonSerDeser.fromJson(statusString);
        for (int i = 0; service.getId() == null && i < 30; i++) {
            LOG.info("Waiting for application Id. AppStatusString=\n {}", statusString);
            try {
                Thread.sleep(1000L);
                statusString = createServiceClient.getStatusString(this.serviceSpec.getName());
                service = (Service) ServiceApiUtil.jsonSerDeser.fromJson(statusString);
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
        if (service.getId() == null) {
            throw new YarnException("Can't get application id for Service " + this.serviceSpec.getName());
        }
        ApplicationId fromString = ApplicationId.fromString(service.getId());
        createServiceClient.stop();
        return fromString;
    }

    @VisibleForTesting
    public Service getServiceSpec() {
        return this.serviceSpec;
    }

    @VisibleForTesting
    public Map<String, String> getComponentToLocalLaunchScriptPath() {
        return this.componentToLocalLaunchScriptPath;
    }
}
