/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestDataTransferProtocol {
    private static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.hdfs.TestDataTransferProtocol");
    private static final DataChecksum DEFAULT_CHECKSUM = DataChecksum.newDataChecksum((DataChecksum.Type)DataChecksum.Type.CRC32C, (int)512);
    DatanodeID datanode;
    InetSocketAddress dnAddr;
    final ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
    final DataOutputStream sendOut = new DataOutputStream(this.sendBuf);
    final Sender sender = new Sender(this.sendOut);
    final ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
    final DataOutputStream recvOut = new DataOutputStream(this.recvBuf);

    /*
     * Exception decompiling
     */
    private void sendRecvData(String testDescription, boolean eofExpected) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [0[TRYBLOCK], 3[CATCHBLOCK]], but top level block is 2[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    void createFile(FileSystem fs, Path path, int fileLen) throws IOException {
        byte[] arr = new byte[fileLen];
        FSDataOutputStream out = fs.create(path);
        out.write(arr);
        out.close();
    }

    void readFile(FileSystem fs, Path path, int fileLen) throws IOException {
        byte[] arr = new byte[fileLen];
        FSDataInputStream in = fs.open(path);
        in.readFully(arr);
    }

    private void writeZeroLengthPacket(ExtendedBlock block, String description) throws IOException {
        PacketHeader hdr = new PacketHeader(8, block.getNumBytes(), 100L, true, 0, false);
        hdr.write(this.sendOut);
        this.sendOut.writeInt(0);
        this.sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
        new PipelineAck(100L, new int[]{PipelineAck.combineHeader((PipelineAck.ECN)PipelineAck.ECN.DISABLED, (DataTransferProtos.Status)DataTransferProtos.Status.SUCCESS)}).write((OutputStream)this.recvOut);
        this.sendRecvData(description, false);
    }

    private void sendResponse(DataTransferProtos.Status status, String firstBadLink, String message, DataOutputStream out) throws IOException {
        DataTransferProtos.BlockOpResponseProto.Builder builder = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(status);
        if (firstBadLink != null) {
            builder.setFirstBadLink(firstBadLink);
        }
        if (message != null) {
            builder.setMessage(message);
        }
        builder.build().writeDelimitedTo((OutputStream)out);
    }

    private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS, String description, Boolean eofExcepted) throws IOException {
        this.sendBuf.reset();
        this.recvBuf.reset();
        this.writeBlock(block, stage, newGS, DEFAULT_CHECKSUM);
        if (eofExcepted.booleanValue()) {
            this.sendResponse(DataTransferProtos.Status.ERROR, null, null, this.recvOut);
            this.sendRecvData(description, true);
        } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
            this.sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
            this.sendRecvData(description, false);
        } else {
            this.writeZeroLengthPacket(block, description);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOpWrite() throws IOException {
        int numDataNodes = 1;
        long BLOCK_ID_FUDGE = 128L;
        HdfsConfiguration conf = new HdfsConfiguration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
        try {
            cluster.waitActive();
            String poolId = cluster.getNamesystem().getBlockPoolId();
            this.datanode = InternalDataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
            this.dnAddr = NetUtils.createSocketAddr((String)this.datanode.getXferAddr());
            DistributedFileSystem fileSys = cluster.getFileSystem();
            Path file = new Path("dataprotocol.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 1L, (short)numDataNodes, 0L);
            ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock((FileSystem)fileSys, file);
            this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Cannot create an existing block", true);
            this.testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING, 0L, "Unexpected stage", true);
            long newGS = firstBlock.getGenerationStamp() + 1L;
            this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS, "Cannot recover data streaming to a finalized replica", true);
            newGS = firstBlock.getGenerationStamp() + 1L;
            this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND, newGS, "Append to a finalized replica", false);
            firstBlock.setGenerationStamp(newGS);
            file = new Path("dataprotocol1.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 1L, (short)numDataNodes, 0L);
            firstBlock = DFSTestUtil.getFirstBlock((FileSystem)fileSys, file);
            newGS = firstBlock.getGenerationStamp() + 1L;
            this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS, "Recover appending to a finalized replica", false);
            file = new Path("dataprotocol2.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file, 1L, (short)numDataNodes, 0L);
            firstBlock = DFSTestUtil.getFirstBlock((FileSystem)fileSys, file);
            newGS = firstBlock.getGenerationStamp() + 1L;
            this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS, "Recover failed close to a finalized replica", false);
            firstBlock.setGenerationStamp(newGS);
            long newBlockId = firstBlock.getBlockId() + 128L;
            ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(), newBlockId, 0L, firstBlock.getGenerationStamp());
            this.testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Create a new block", false);
            newGS = newBlock.getGenerationStamp() + 1L;
            newBlock.setBlockId(newBlock.getBlockId() + 1L);
            this.testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS, "Recover a new block", true);
            newGS = newBlock.getGenerationStamp() + 1L;
            this.testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND, newGS, "Cannot append to a new block", true);
            newBlock.setBlockId(newBlock.getBlockId() + 1L);
            newGS = newBlock.getGenerationStamp() + 1L;
            this.testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS, "Cannot append to a new block", true);
            Path file1 = new Path("dataprotocol1.dat");
            DFSTestUtil.createFile((FileSystem)fileSys, file1, 1L, (short)numDataNodes, 0L);
            DFSOutputStream out = (DFSOutputStream)fileSys.append(file1).getWrappedStream();
            out.write(1);
            out.hflush();
            FSDataInputStream in = fileSys.open(file1);
            firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
            firstBlock.setNumBytes(2L);
            try {
                this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Cannot create a RBW block", true);
                newGS = firstBlock.getGenerationStamp() + 1L;
                this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND, newGS, "Cannot append to a RBW replica", true);
                this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS, "Recover append to a RBW replica", false);
                firstBlock.setGenerationStamp(newGS);
                file = new Path("dataprotocol2.dat");
                DFSTestUtil.createFile((FileSystem)fileSys, file, 1L, (short)numDataNodes, 0L);
                out = (DFSOutputStream)fileSys.append(file).getWrappedStream();
                out.write(1);
                out.hflush();
                in = fileSys.open(file);
                firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
                firstBlock.setNumBytes(2L);
                newGS = firstBlock.getGenerationStamp() + 1L;
                this.testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS, "Recover a RBW replica", false);
            }
            finally {
                IOUtils.closeStream((Closeable)in);
                IOUtils.closeStream((Closeable)out);
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDataTransferProtocol() throws IOException {
        Random random = new Random();
        int oneMil = 0x100000;
        Path file = new Path("dataprotocol.dat");
        int numDataNodes = 1;
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setInt("dfs.replication", numDataNodes);
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).build();
        try {
            cluster.waitActive();
            this.datanode = cluster.getFileSystem().getDataNodeStats(HdfsConstants.DatanodeReportType.LIVE)[0];
            this.dnAddr = NetUtils.createSocketAddr((String)this.datanode.getXferAddr());
            DistributedFileSystem fileSys = cluster.getFileSystem();
            int fileLen = Math.min(conf.getInt("dfs.blocksize", 4096), 4096);
            this.createFile((FileSystem)fileSys, file, fileLen);
            ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock((FileSystem)fileSys, file);
            String poolId = firstBlock.getBlockPoolId();
            long newBlockId = firstBlock.getBlockId() + 1L;
            this.recvBuf.reset();
            this.sendBuf.reset();
            this.recvOut.writeShort(27);
            this.sendOut.writeShort(27);
            this.sendRecvData("Wrong Version", true);
            this.sendBuf.reset();
            this.sendOut.writeShort(28);
            this.sendOut.writeByte(Op.WRITE_BLOCK.code - 1);
            this.sendRecvData("Wrong Op Code", true);
            this.sendBuf.reset();
            DataChecksum badChecksum = (DataChecksum)Mockito.spy((Object)DEFAULT_CHECKSUM);
            ((DataChecksum)Mockito.doReturn((Object)-1).when((Object)badChecksum)).getBytesPerChecksum();
            this.writeBlock(poolId, newBlockId, badChecksum);
            this.recvBuf.reset();
            this.sendResponse(DataTransferProtos.Status.ERROR, null, null, this.recvOut);
            this.sendRecvData("wrong bytesPerChecksum while writing", true);
            this.sendBuf.reset();
            this.recvBuf.reset();
            this.writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
            PacketHeader hdr = new PacketHeader(4, 0L, 100L, false, -1 - random.nextInt(oneMil), false);
            hdr.write(this.sendOut);
            this.sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
            new PipelineAck(100L, new int[]{PipelineAck.combineHeader((PipelineAck.ECN)PipelineAck.ECN.DISABLED, (DataTransferProtos.Status)DataTransferProtos.Status.ERROR)}).write((OutputStream)this.recvOut);
            this.sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, true);
            this.sendBuf.reset();
            this.recvBuf.reset();
            this.writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
            hdr = new PacketHeader(8, 0L, 100L, true, 0, false);
            hdr.write(this.sendOut);
            this.sendOut.writeInt(0);
            this.sendOut.flush();
            this.sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
            new PipelineAck(100L, new int[]{PipelineAck.combineHeader((PipelineAck.ECN)PipelineAck.ECN.DISABLED, (DataTransferProtos.Status)DataTransferProtos.Status.SUCCESS)}).write((OutputStream)this.recvOut);
            this.sendRecvData("Writing a zero len block blockid " + newBlockId, false);
            String bpid = cluster.getNamesystem().getBlockPoolId();
            ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
            long blkid = blk.getBlockId();
            this.sendBuf.reset();
            this.recvBuf.reset();
            blk.setBlockId(blkid - 1L);
            this.sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, (long)fileLen, true, CachingStrategy.newDefaultStrategy());
            this.sendRecvData("Wrong block ID " + newBlockId + " for read", false);
            this.sendBuf.reset();
            blk.setBlockId(blkid);
            this.sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", -1L, (long)fileLen, true, CachingStrategy.newDefaultStrategy());
            this.sendRecvData("Negative start-offset for read for block " + firstBlock.getBlockId(), false);
            this.sendBuf.reset();
            this.sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", (long)fileLen, (long)fileLen, true, CachingStrategy.newDefaultStrategy());
            this.sendRecvData("Wrong start-offset for reading block " + firstBlock.getBlockId(), false);
            this.recvBuf.reset();
            DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setReadOpChecksumInfo(DataTransferProtos.ReadOpChecksumInfoProto.newBuilder().setChecksum(DataTransferProtoUtil.toProto((DataChecksum)DEFAULT_CHECKSUM)).setChunkOffset(0L)).build().writeDelimitedTo((OutputStream)this.recvOut);
            this.sendBuf.reset();
            this.sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, -1L - (long)random.nextInt(oneMil), true, CachingStrategy.newDefaultStrategy());
            this.sendRecvData("Negative length for reading block " + firstBlock.getBlockId(), false);
            this.recvBuf.reset();
            this.sendResponse(DataTransferProtos.Status.ERROR, null, "opReadBlock " + firstBlock + " received exception java.io.IOException:  Offset 0 and length 4097 don't match block " + firstBlock + " ( blockLen 4096 )", this.recvOut);
            this.sendBuf.reset();
            this.sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, (long)(fileLen + 1), true, CachingStrategy.newDefaultStrategy());
            this.sendRecvData("Wrong length for reading block " + firstBlock.getBlockId(), false);
            this.sendBuf.reset();
            this.sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, (long)fileLen, true, CachingStrategy.newDefaultStrategy());
            this.readFile((FileSystem)fileSys, file, fileLen);
        }
        finally {
            cluster.shutdown();
        }
    }

    @Test
    public void testPacketHeader() throws IOException {
        PacketHeader hdr = new PacketHeader(4, 1024L, 100L, false, 4096, false);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        hdr.write(new DataOutputStream(baos));
        PacketHeader readBack = new PacketHeader();
        ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
        readBack.readFields(new DataInputStream(bais));
        Assert.assertEquals((Object)hdr, (Object)readBack);
        readBack = new PacketHeader();
        readBack.readFields(ByteBuffer.wrap(baos.toByteArray()));
        Assert.assertEquals((Object)hdr, (Object)readBack);
        Assert.assertTrue((boolean)hdr.sanityCheck(99L));
        Assert.assertFalse((boolean)hdr.sanityCheck(100L));
    }

    @Test
    public void TestPipeLineAckCompatibility() throws IOException {
        DataTransferProtos.PipelineAckProto proto = DataTransferProtos.PipelineAckProto.newBuilder().setSeqno(0L).addReply(DataTransferProtos.Status.CHECKSUM_OK).build();
        DataTransferProtos.PipelineAckProto newProto = DataTransferProtos.PipelineAckProto.newBuilder().mergeFrom(proto).addFlag(PipelineAck.combineHeader((PipelineAck.ECN)PipelineAck.ECN.SUPPORTED, (DataTransferProtos.Status)DataTransferProtos.Status.CHECKSUM_OK)).build();
        ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream();
        proto.writeDelimitedTo((OutputStream)oldAckBytes);
        PipelineAck oldAck = new PipelineAck();
        oldAck.readFields((InputStream)new ByteArrayInputStream(oldAckBytes.toByteArray()));
        Assert.assertEquals((long)PipelineAck.combineHeader((PipelineAck.ECN)PipelineAck.ECN.DISABLED, (DataTransferProtos.Status)DataTransferProtos.Status.CHECKSUM_OK), (long)oldAck.getHeaderFlag(0));
        PipelineAck newAck = new PipelineAck();
        ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream();
        newProto.writeDelimitedTo((OutputStream)newAckBytes);
        newAck.readFields((InputStream)new ByteArrayInputStream(newAckBytes.toByteArray()));
        Assert.assertEquals((long)PipelineAck.combineHeader((PipelineAck.ECN)PipelineAck.ECN.SUPPORTED, (DataTransferProtos.Status)DataTransferProtos.Status.CHECKSUM_OK), (long)newAck.getHeaderFlag(0));
    }

    void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
        this.writeBlock(new ExtendedBlock(poolId, blockId), BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
    }

    void writeBlock(ExtendedBlock block, BlockConstructionStage stage, long newGS, DataChecksum checksum) throws IOException {
        this.sender.writeBlock(block, StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
    }
}

