Package org.apache.hadoop.hdfs.protocol.DataTransferProtocol

Examples of org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck


      PacketReceiveProfile pktProfile = null;
      while (running && datanode.shouldRun && !lastPacketInBlock) {
        Packet pkt = null;
          try {
            long expected = PipelineAck.UNKOWN_SEQNO;
            PipelineAck ack = new PipelineAck();
            long seqno = PipelineAck.UNKOWN_SEQNO;
            boolean localMirrorError = mirrorError;
            try {
              synchronized (this) {
                // wait for a packet to arrive
                while (running && datanode.shouldRun && ackQueue.size() == 0) {
                  if (LOG.isDebugEnabled()) {
                    LOG.debug("PacketResponder " + numTargets +
                              " seqno = " + seqno +
                              " for block " + block +
                              " waiting for local datanode to finish write.");
                  }
                  wait();
                }
                if (!running || !datanode.shouldRun) {
                  break;
                }
                pkt = ackQueue.removeFirst();
                expected = pkt.seqno;
                processingAck = true;
                notifyAll();
              }
              if (pkt != null) {
                pktProfile = pkt.eventAckDequeuedAndReturnProfile(pktProfile);
              }

              // receive an ack if DN is not the last one in the pipeline
              if (numTargets > 0 && !localMirrorError) {
                // read an ack from downstream datanode
                ack.readFields(mirrorIn, numTargets, pktProfile != null);
                if (LOG.isDebugEnabled()) {
                  LOG.debug("PacketResponder " + numTargets +
                      " for block " + block + " got " + ack);
                }
                seqno = ack.getSeqno();
                // verify seqno
                if (seqno != expected) {
                  throw new IOException("PacketResponder " + numTargets +
                      " for block " + block +
                      " expected seqno:" + expected +
                      " received:" + seqno);
                }
              }
             
              assert pkt != null;

              pkt.eventAckReceived();

              try {
                pkt.waitForPersistent();
              } catch (InterruptedException ine) {
                isInterrupted = true;
                LOG.info("PacketResponder " + block +  " " + numTargets +
                    " : Thread is interrupted when waiting for data persistent.");
                break;
              }
              pkt.eventResponderKnewDataPersistent();
             
              lastPacketInBlock = pkt.lastPacketInBlock;
              if (pkt.seqno >= 0) {
                replicaBeingWritten.setBytesAcked(pkt.offsetInBlock);
              }
            } catch (InterruptedException ine) {
              isInterrupted = true;
            } catch (IOException ioe) {
              if (Thread.interrupted()) {
                isInterrupted = true;
              } else {
                // continue to run even if can not read from mirror
                // notify client of the error
                // and wait for the client to shut down the pipeline
                mirrorError = true;
                LOG.info("PacketResponder " + block + " " + numTargets +
                    " Exception " + StringUtils.stringifyException(ioe));
              }
            } finally {
              synchronized (this) {
                processingAck = false;
                notifyAll();
              }             
            }

            if (Thread.interrupted() || isInterrupted) {
              /* The receiver thread cancelled this thread.
               * We could also check any other status updates from the
               * receiver thread (e.g. if it is ok to write to replyOut).
               * It is prudent to not send any more status back to the client
               * because this datanode has a problem. The upstream datanode
               * will detect that this datanode is bad, and rightly so.
               */
              LOG.info("PacketResponder " + block +  " " + numTargets +
                       " : Thread is interrupted.");
              break;
            }
            if (lastPacketInBlock) {
              // Wait receiver thread to finish flush() before sending out the
              // last ack.
              try {
                synchronized (this) {
                  while (running && datanode.shouldRun && !isBlockClosed) {
                    wait();
                  }
                }
              } catch (InterruptedException ine) {
                LOG.info("PacketResponder " + block + " " + numTargets
                    + " : Thread is interrupted while waiting for the block "
                    + "to be closed.");
                isInterrupted = true;
              }
            }

            // construct my ack message
            short[] replies = null;
            PacketBlockReceiverProfileData[] profiles = null;
            if (mirrorError) { // no ack is read
              replies = new short[2];
              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
              replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
            } else {
              boolean hasError = false;
              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
              replies = new short[1+ackLen];
              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
              for (int i=0; i<ackLen; i++) {
                replies[i+1] = ack.getReply(i);
                if (replies[i+1] != DataTransferProtocol.OP_STATUS_SUCCESS) {
                  hasError = true;
                }
              }
              if (!hasError && pktProfile != null) {
                profiles = new PacketBlockReceiverProfileData[ackLen + 1];
                for (int i=0; i < ackLen; i++) {
                  profiles[i + 1] = ack.getProfile(i);
                  if (profiles[i + 1] == null) {
                    throw new IOException("Excpect profile data but it's missing.");
                  }
                }
                profiles[0] = pktProfile.getPacketBlockReceiverProfileData();
              }
            }
           
           
            PipelineAck replyAck = new PipelineAck(expected, replies, profiles);
            // send my ack back to upstream datanode
            replyAck.write(replyOut);
            replyOut.flush();
            if (LOG.isDebugEnabled()) {
              LOG.debug("PacketResponder " + numTargets +
                        " for block " + block +
                        " responded an ack: " + replyAck);
View Full Code Here


          if (closed || !dfsClient.clientRunning) {
            break;
          }

          eventStartReceiveAck();
          PipelineAck pipelineAck = null;
          if (!doParallelWrites) {
            // verify seqno from datanode
            if (supportClientHeartbeat()) {
              pipelineAck = new PipelineAck();
              pipelineAck.readFields(blockReplyStream.get(0), targets.length,
                  profileData != null);
             
              seqno = pipelineAck.getSeqno();
             
              if (!pipelineAck.isSuccess()) {
                for (int i = 0; i < targets.length && dfsClient.clientRunning; i++) {
                  short reply = pipelineAck.getReply(i);
                  if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
                    recordError = i; // first bad datanode
                    throw new IOException("Bad response " + reply + " for block "
                        + block + " from datanode " + targets[i].getName());
                  }
View Full Code Here

TOP

Related Classes of org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck

Copyright © 2018 www.massapicom. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.