Package com.thimbleware.jmemcached.protocol.text

Source Code of com.thimbleware.jmemcached.protocol.text.MemcachedCommandDecoder

package com.thimbleware.jmemcached.protocol.text;

import com.thimbleware.jmemcached.MCElement;
import com.thimbleware.jmemcached.protocol.Command;
import com.thimbleware.jmemcached.protocol.CommandMessage;
import com.thimbleware.jmemcached.protocol.SessionStatus;
import com.thimbleware.jmemcached.protocol.exceptions.InvalidProtocolStateException;
import com.thimbleware.jmemcached.protocol.exceptions.MalformedCommandException;
import com.thimbleware.jmemcached.protocol.exceptions.UnknownCommandException;
import org.jboss.netty.channel.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

/**
* The MemcachedCommandDecoder is responsible for taking lines from the MemcachedFrameDecoder and parsing them
* into CommandMessage instances for handling by the MemcachedCommandHandler
* <p/>
* Protocol status is held in the SessionStatus instance which is shared between each of the decoders in the pipeline.
*/
@ChannelPipelineCoverage("one")
public class MemcachedCommandDecoder extends SimpleChannelUpstreamHandler {

    final Logger logger = LoggerFactory.getLogger(MemcachedCommandDecoder.class);

    private SessionStatus status;

    private static final String NOREPLY = "noreply";

    public MemcachedCommandDecoder(SessionStatus status) {
        this.status = status;
    }                                    

    /**
     * Process an inbound string from the pipeline's downstream, and depending on the state (waiting for data or
     * processing commands), turn them into the correct type of command.
     *
     * @param channelHandlerContext
     * @param messageEvent
     * @throws Exception
     */
    @Override
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        ChannelBuffer in = (ChannelBuffer) messageEvent.getMessage();

        try {
            // Because of the frame handler, we are assured that we are receiving only complete lines or payloads.
            // Verify that we are in 'processing()' mode
            if (status.state == SessionStatus.State.PROCESSING) {
                // split into pieces
                String[] commandPieces = in.toString("US-ASCII").split(" ");

                processLine(commandPieces, messageEvent.getChannel(), channelHandlerContext);
            } else if (status.state == SessionStatus.State.PROCESSING_MULTILINE) {
                byte[] payload = new byte[in.capacity()];
                in.readBytes(payload);
                continueSet(messageEvent.getChannel(), status, payload, channelHandlerContext);
            } else {
                throw new InvalidProtocolStateException("invalid protocol state");
            }

        } finally {
            // Now indicate that we need more for this command by changing the session status's state.
            // This instructs the frame decoder to start collecting data for us.
            // Note, we don't do this if we're waiting for data.
            if (status.state != SessionStatus.State.WAITING_FOR_DATA) status.ready();
        }
    }

    /**
     * Process an individual complete protocol line and either passes the command for processing by the
     * session handler, or (in the case of SET-type commands) partially parses the command and sets the session into
     * a state to wait for additional data.
     *
     * @param parts                 the (originally space separated) parts of the command
     * @param channel
     * @param channelHandlerContext
     * @return the session status we want to set the session to
     */
    private void processLine(String[] parts, Channel channel, ChannelHandlerContext channelHandlerContext) throws UnknownCommandException, MalformedCommandException {
        final int numParts = parts.length;

        // Turn the command into an enum for matching on
        Command cmdType;
        try {
            cmdType = Command.valueOf(parts[0].toUpperCase());
        } catch (IllegalArgumentException e) {
            throw new UnknownCommandException("unknown command: " + parts[0].toLowerCase());
        }

        // Produce the initial command message, for filling in later
        CommandMessage cmd = CommandMessage.command(cmdType);

        // TODO there is a certain amount of fudgery here related to common things like 'noreply', etc. that could be refactored nicely

        // Dispatch on the type of command
        if (cmdType == Command.ADD ||
                cmdType == Command.SET ||
                cmdType == Command.REPLACE ||
                cmdType == Command.CAS ||
                cmdType == Command.APPEND ||
                cmdType == Command.PREPEND) {

            // if we don't have all the parts, it's malformed
            if (numParts < 5) {
                throw new MalformedCommandException("invalid command length");
            }

            // Fill in all the elements of the command
            int size = Integer.parseInt(parts[4]);
            int expire = Integer.parseInt(parts[3]);
            int flags = Integer.parseInt(parts[2]);
            cmd.element = new MCElement(parts[1], flags, expire != 0 && expire < MCElement.THIRTY_DAYS ? MCElement.Now() + expire : expire, size);

            // look for cas and "noreply" elements
            if (numParts > 5) {
                int noreply = cmdType == Command.CAS ? 6 : 5;
                if (cmdType == Command.CAS) {
                    cmd.cas_key = Long.valueOf(parts[5]);
                }

                if (numParts == noreply + 1 && parts[noreply].equalsIgnoreCase(NOREPLY))
                    cmd.noreply = true;
            }

            // Now indicate that we need more for this command by changing the session status's state.
            // This instructs the frame decoder to start collecting data for us.
            status.needMore(size, cmd);
        } else if (cmdType == Command.GET ||
                cmdType == Command.GETS ||
                cmdType == Command.STATS ||
                cmdType == Command.QUIT ||
                cmdType == Command.VERSION) {

            // Get all the keys
            cmd.keys.addAll(Arrays.asList(parts).subList(1, numParts));

            // Pass it on.
            Channels.fireMessageReceived(channelHandlerContext, cmd, channel.getRemoteAddress());
        } else if (cmdType == Command.INCR ||
                cmdType == Command.DECR) {

            // Malformed
            if (numParts < 2 || numParts > 3)
                throw new MalformedCommandException("invalid increment command");

            cmd.keys.add(parts[1]);
            cmd.incrAmount = Integer.valueOf(parts[2]);
           
            if (numParts == 3 && parts[2].equalsIgnoreCase(NOREPLY)) {
                cmd.noreply = true;
            }

            Channels.fireMessageReceived(channelHandlerContext, cmd, channel.getRemoteAddress());
        } else if (cmdType == Command.DELETE) {
            cmd.keys.add(parts[1]);

            if (numParts >= 2) {
                if (parts[numParts - 1].equalsIgnoreCase(NOREPLY)) {
                    cmd.noreply = true;
                    if (numParts == 4)
                        cmd.time = Integer.valueOf(parts[2]);
                } else if (numParts == 3)
                    cmd.time = Integer.valueOf(parts[2]);
            }
            Channels.fireMessageReceived(channelHandlerContext, cmd, channel.getRemoteAddress());
        } else if (cmdType == Command.FLUSH_ALL) {
            if (numParts >= 1) {
                if (parts[numParts - 1].equalsIgnoreCase(NOREPLY)) {
                    cmd.noreply = true;
                    if (numParts == 3)
                        cmd.time = Integer.valueOf(parts[1]);
                } else if (numParts == 2)
                    cmd.time = Integer.valueOf(parts[1]);
            }
            Channels.fireMessageReceived(channelHandlerContext, cmd, channel.getRemoteAddress());
        } else {
            throw new UnknownCommandException("unknown command: " + cmdType);
        }

    }

    /**
     * Handles the continuation of a SET/ADD/REPLACE command with the data it was waiting for.
     *
     * @param state                 the current session status (unused)
     * @param remainder             the bytes picked up
     * @param channelHandlerContext
     * @return the new status to set the session to
     */
    private void continueSet(Channel channel, SessionStatus state, byte[] remainder, ChannelHandlerContext channelHandlerContext) {
        state.cmd.element.data = remainder;
        Channels.fireMessageReceived(channelHandlerContext, state.cmd, channelHandlerContext.getChannel().getRemoteAddress());
    }
}
TOP

Related Classes of com.thimbleware.jmemcached.protocol.text.MemcachedCommandDecoder

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.