Package org.apache.james.mailbox.hbase

Source Code of org.apache.james.mailbox.hbase.HBaseUtils

/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one   *
* or more contributor license agreements.  See the NOTICE file *
* distributed with this work for additional information        *
* regarding copyright ownership.  The ASF licenses this file   *
* to you under the Apache License, Version 2.0 (the            *
* "License"); you may not use this file except in compliance   *
* with the License.  You may obtain a copy of the License at   *
*                                                              *
*   http://www.apache.org/licenses/LICENSE-2.0                 *
*                                                              *
* Unless required by applicable law or agreed to in writing,   *
* software distributed under the License is distributed on an  *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
* KIND, either express or implied.  See the License for the    *
* specific language governing permissions and limitations      *
* under the License.                                           *
****************************************************************/
package org.apache.james.mailbox.hbase;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.NavigableMap;
import java.util.UUID;
import javax.mail.Flags;
import javax.mail.Flags.Flag;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.james.mailbox.hbase.io.ChunkInputStream;
import org.apache.james.mailbox.hbase.mail.model.HBaseMailbox;
import org.apache.james.mailbox.hbase.mail.HBaseMessage;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.Message;
import org.apache.james.mailbox.store.mail.model.Property;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.user.model.Subscription;

import static org.apache.james.mailbox.hbase.FlagConvertor.*;
import static org.apache.james.mailbox.hbase.PropertyConvertor.*;
import static org.apache.james.mailbox.hbase.HBaseNames.*;

/**
* HBase utility classes for mailbox and message manipulation.
* @author ieugen
*/
public class HBaseUtils {
    // TODO: swith to a bit wise implementation of flags.

    /**
     * Creates a Mailbox object from a HBase Result object.
     * @param result a result of a HBase Get operation
     * @return a Mailbox object
     */
    public static Mailbox<UUID> mailboxFromResult(Result result) {
        NavigableMap<byte[], byte[]> rawMailbox = result.getFamilyMap(MAILBOX_CF);
        //TODO: should we test for null values?
        MailboxPath path = new MailboxPath(Bytes.toString(rawMailbox.get(MAILBOX_NAMESPACE)),
                Bytes.toString(rawMailbox.get(MAILBOX_USER)),
                Bytes.toString(rawMailbox.get(MAILBOX_NAME)));

        HBaseMailbox mailbox = new HBaseMailbox(path, Bytes.toLong(rawMailbox.get(MAILBOX_UIDVALIDITY)));
        mailbox.setMailboxId(UUIDFromRowKey(result.getRow()));
        mailbox.setHighestModSeq(Bytes.toLong(rawMailbox.get(MAILBOX_HIGHEST_MODSEQ)));
        mailbox.setLastUid(Bytes.toLong(rawMailbox.get(MAILBOX_LASTUID)));
        mailbox.setMessageCount(Bytes.toLong(rawMailbox.get(MAILBOX_MESSAGE_COUNT)));
        return mailbox;
    }

    /**
     * This returns the row key needed for HBase. Having the method here ensure
     * we have a consistent way to generate the rowkey.
     *
     * Convenience method for generating a rowKey when you don't have a mailbox object.
     * @param uuid
     * @return rowkey byte array that can be used with HBase API
     */
    public static byte[] mailboxRowKey(UUID uuid) {
        byte[] rowKey = new byte[16];
        int offset = Bytes.putLong(rowKey, 0, uuid.getMostSignificantBits());
        Bytes.putLong(rowKey, offset, uuid.getLeastSignificantBits());
        return rowKey;
    }

    /**
     * Returns a UUID from the a byte array.
     * @param rowkey
     * @return UUID calculated from the byte array
     */
    public static UUID UUIDFromRowKey(byte[] rowkey) {
        return new UUID(Bytes.toLong(rowkey, 0), Bytes.toLong(rowkey, 8));
    }

    /**
     * Transforms the mailbox into a Put operation.
     * @return a Put object
     */
    public static Put toPut(HBaseMailbox mailbox) {
        Put put = new Put(mailboxRowKey(mailbox.getMailboxId()));
        // we don't store null values and we don't restore them. it's a column based store.
        if (mailbox.getName() != null) {
            put.add(MAILBOX_CF, MAILBOX_NAME, Bytes.toBytes(mailbox.getName()));
        }

        if (mailbox.getUser() != null) {
            put.add(MAILBOX_CF, MAILBOX_USER, Bytes.toBytes(mailbox.getUser()));
        }
        if (mailbox.getNamespace() != null) {
            put.add(MAILBOX_CF, MAILBOX_NAMESPACE, Bytes.toBytes(mailbox.getNamespace()));
        }
        put.add(MAILBOX_CF, MAILBOX_LASTUID, Bytes.toBytes(mailbox.getLastUid()));
        put.add(MAILBOX_CF, MAILBOX_UIDVALIDITY, Bytes.toBytes(mailbox.getUidValidity()));
        put.add(MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ, Bytes.toBytes(mailbox.getHighestModSeq()));
        put.add(MAILBOX_CF, MAILBOX_MESSAGE_COUNT, Bytes.toBytes(mailbox.getMessageCount()));
        return put;
    }

    /**
     * Transforms only the metadata into a Put object. The rest of the message will
     * be transfered using multiple Puts if size requires it.
     * @param message
     * @return a put that contains all metadata information.
     */
    public static Put metadataToPut(Message<UUID> message) {
        Put put = new Put(messageRowKey(message));
        // we store the message uid and mailbox uid in the row key
        // store the meta data
        put.add(MESSAGES_META_CF, MESSAGE_MODSEQ, Bytes.toBytes(message.getModSeq()));
        put.add(MESSAGES_META_CF, MESSAGE_INTERNALDATE, Bytes.toBytes(message.getInternalDate().getTime()));
        put.add(MESSAGES_META_CF, MESSAGE_MEDIA_TYPE, Bytes.toBytes(message.getMediaType()));
        put.add(MESSAGES_META_CF, MESSAGE_SUB_TYPE, Bytes.toBytes(message.getSubType()));
        put.add(MESSAGES_META_CF, MESSAGE_CONTENT_OCTETS, Bytes.toBytes(message.getFullContentOctets()));
        put.add(MESSAGES_META_CF, MESSAGE_BODY_OCTETS, Bytes.toBytes(message.getBodyOctets()));
        if (message.getTextualLineCount() != null) {
            put.add(MESSAGES_META_CF, MESSAGE_TEXT_LINE_COUNT, Bytes.toBytes(message.getTextualLineCount()));
        }
        // store system flags in meta and user flags in uflags to avoid name clashes
        Flags flags = message.createFlags();
        // system flags
        if (flags.contains(Flag.ANSWERED)) {
            put.add(MESSAGES_META_CF, FLAGS_ANSWERED, MARKER_PRESENT);
        }
        if (flags.contains(Flag.DELETED)) {
            put.add(MESSAGES_META_CF, FLAGS_DELETED, MARKER_PRESENT);
        }
        if (flags.contains(Flag.DRAFT)) {
            put.add(MESSAGES_META_CF, FLAGS_DRAFT, MARKER_PRESENT);
        }
        if (flags.contains(Flag.FLAGGED)) {
            put.add(MESSAGES_META_CF, FLAGS_FLAGGED, MARKER_PRESENT);
        }
        if (flags.contains(Flag.RECENT)) {
            put.add(MESSAGES_META_CF, FLAGS_RECENT, MARKER_PRESENT);
        }
        if (flags.contains(Flag.SEEN)) {
            put.add(MESSAGES_META_CF, FLAGS_SEEN, MARKER_PRESENT);
        }
        if (flags.contains(Flag.USER)) {
            put.add(MESSAGES_META_CF, FLAGS_USER, MARKER_PRESENT);
        }

        // user flags
        for (String flag : flags.getUserFlags()) {
            put.add(MESSAGES_META_CF, userFlagToBytes(flag), MARKER_PRESENT);
        }
        int propNumber = 0;
        // add the properties
        for (Property prop : message.getProperties()) {
            put.add(MESSAGES_META_CF, getQualifier(propNumber++), getValue(prop));
        }

        return put;
    }

    /**
     * Create a row key for a message in a mailbox. The current row key is mailboxID followed by messageID.
     * Both values are fixed length so no separator is needed.
     * Downside: we will be storing the same message multiple times, one time for each recipient.
     * @param message message to get row key from
     * @return rowkey byte array that can be used with HBase API
     */
    public static byte[] messageRowKey(Message<UUID> message) {
        return messageRowKey(message.getMailboxId(), message.getUid());
    }

    /**
     * Utility method to build row keys from mailbox UUID and message uid.
     * The message uid's are stored in reverse order by substracting the uid value
     * from Long.MAX_VALUE.
     * @param mailboxUid mailbox UUID
     * @param uid message uid
     * @return rowkey byte array that can be used with HBase API
     */
    public static byte[] messageRowKey(UUID mailboxUid, long uid) {
        /**  message uid's are stored in reverse order so we will always have the most recent messages first*/
        byte[] ba = Bytes.add(Bytes.toBytes(mailboxUid.getMostSignificantBits()),
                Bytes.toBytes(mailboxUid.getLeastSignificantBits()),
                Bytes.toBytes(Long.MAX_VALUE - uid));
        //System.out.println(Bytes.toStringBinary(ba));
        return ba;
    }

    /**
     * Utility to build row keys from mailboxUID and a value. The value is added to
     * the key without any other operations.
     * @param mailboxUid mailbox UUID
     * @param value
     * @return rowkey byte array that can be used with HBase API
     */
    public static byte[] customMessageRowKey(UUID mailboxUid, long value) {
        return Bytes.add(Bytes.toBytes(mailboxUid.getMostSignificantBits()),
                Bytes.toBytes(mailboxUid.getLeastSignificantBits()),
                Bytes.toBytes(value));
    }

    /**
     * Creates a HBaseMessage from a Result object. This method retrieves all information
     * except for body and header related bytes. The message content will be loaded on demand
     * through a specialised InputStream called {@link ChunkInputStream}.
     * IMPORTANT: the method expects a single version of each cell. Use setMaxVersions(1).
     * @param conf configuration object for HBase cluster
     * @param result the result object containing message data
     * @return a HBaseMessage instance with message metadata.
     */
    public static Message<UUID> messageMetaFromResult(Configuration conf, Result result) {
        HBaseMessage message = null;
        Flags flags = new Flags();
        List<Property> propList = new ArrayList<Property>();
        KeyValue[] keys = result.raw();
        String mediaType = null, subType = null;
        Long modSeq = null, uid, bodyOctets = null, contentOctets = null, textualLineCount = null;
        Date internalDate = null;

        int i = 0;
        /** it is VERY IMPORTANT that the byte arrays are kept ascending */
        if (Bytes.equals(keys[i].getQualifier(), MESSAGE_BODY_OCTETS)) {
            bodyOctets = Bytes.toLong(keys[i].getValue());
            i++;
        }
        if (Bytes.equals(keys[i].getQualifier(), MESSAGE_CONTENT_OCTETS)) {
            contentOctets = Bytes.toLong(keys[i].getValue());
            i++;
        }
        if (Bytes.equals(keys[i].getQualifier(), MESSAGE_INTERNALDATE)) {
            internalDate = new Date(Bytes.toLong(keys[i].getValue()));
            i++;
        }
        // may be null so it will probably skip
        if (Bytes.equals(keys[i].getQualifier(), MESSAGE_TEXT_LINE_COUNT)) {
            textualLineCount = Bytes.toLong(keys[i].getValue());
            i++;
        }

        if (Bytes.equals(keys[i].getQualifier(), MESSAGE_MODSEQ)) {
            modSeq = Bytes.toLong(keys[i].getValue());
            i++;
        }
        if (Bytes.equals(keys[i].getQualifier(), MESSAGE_MEDIA_TYPE)) {
            mediaType = Bytes.toString(keys[i].getValue());
            i++;
        }
        if (Bytes.equals(keys[i].getQualifier(), MESSAGE_SUB_TYPE)) {
            subType = Bytes.toString(keys[i].getValue());
            i++;
        }
        // only TEXT_LINE_COUNT can be missing if message is binary
        if (i < 5) {
            throw new RuntimeException("HBase message column names not sorted.");
        }
        while (i < keys.length) {
            //get message properties
            if (Bytes.startsWith(keys[i].getQualifier(), PREFIX_PROP_B)) {
                propList.add(getProperty(keys[i].getValue()));
            } else if (Bytes.startsWith(keys[i].getQualifier(), PREFIX_SFLAGS_B)) {
                // get system flags, stored as qualifiers
                if (Bytes.equals(MARKER_PRESENT, keys[i].getValue())) {
                    flags.add(systemFlagFromBytes(keys[i].getQualifier()));
                }
            } else if (Bytes.startsWith(keys[i].getQualifier(), PREFIX_UFLAGS_B)) {
                // get user flags, stored as value qualifier
                flags.add(userFlagFromBytes(keys[i].getQualifier()));
            }
            i++;
        }
        UUID uuid = UUIDFromRowKey(result.getRow());
        uid = Long.MAX_VALUE - Bytes.toLong(result.getRow(), 16);
        PropertyBuilder props = new PropertyBuilder(propList);
        props.setMediaType(mediaType);
        props.setSubType(subType);
        message = new HBaseMessage(conf, uuid, internalDate, flags, contentOctets, (int) (contentOctets - bodyOctets), props);
        message.setUid(uid);
        message.setModSeq(modSeq);
        message.setTextualLineCount(textualLineCount);
        return message;
    }

    /**
     * Creates a Put object from this subscription object
     * @return Put object suitable for HBase persistence
     */
    public static Put toPut(Subscription subscription) {
        Put put = new Put(Bytes.toBytes(subscription.getUser()));
        put.add(SUBSCRIPTION_CF, Bytes.toBytes(subscription.getMailbox()), MARKER_PRESENT);
        return put;
    }

    /**
     * Utility method to transform message flags into a put opperation.
     * @param message
     * @param flags
     * @return a put object with
     */
    public static Put flagsToPut(Message<UUID> message, Flags flags) {
        Put put = new Put(messageRowKey(message));
        //system flags
        if (flags.contains(Flag.ANSWERED)) {
            put.add(MESSAGES_META_CF, FLAGS_ANSWERED, MARKER_PRESENT);
        } else {
            put.add(MESSAGES_META_CF, FLAGS_ANSWERED, MARKER_MISSING);
        }
        if (flags.contains(Flag.DELETED)) {
            put.add(MESSAGES_META_CF, FLAGS_DELETED, MARKER_PRESENT);
        } else {
            put.add(MESSAGES_META_CF, FLAGS_DELETED, MARKER_MISSING);
        }
        if (flags.contains(Flag.DRAFT)) {
            put.add(MESSAGES_META_CF, FLAGS_DRAFT, MARKER_PRESENT);
        } else {
            put.add(MESSAGES_META_CF, FLAGS_DRAFT, MARKER_MISSING);
        }
        if (flags.contains(Flag.FLAGGED)) {
            put.add(MESSAGES_META_CF, FLAGS_FLAGGED, MARKER_PRESENT);
        } else {
            put.add(MESSAGES_META_CF, FLAGS_FLAGGED, MARKER_MISSING);
        }
        if (flags.contains(Flag.RECENT)) {
            put.add(MESSAGES_META_CF, FLAGS_RECENT, MARKER_PRESENT);
        } else {
            put.add(MESSAGES_META_CF, FLAGS_RECENT, MARKER_MISSING);
        }
        if (flags.contains(Flag.SEEN)) {
            put.add(MESSAGES_META_CF, FLAGS_SEEN, MARKER_PRESENT);
        } else {
            put.add(MESSAGES_META_CF, FLAGS_SEEN, MARKER_MISSING);
        }
        if (flags.contains(Flag.USER)) {
            put.add(MESSAGES_META_CF, FLAGS_USER, MARKER_PRESENT);
        } else {
            put.add(MESSAGES_META_CF, FLAGS_USER, MARKER_MISSING);
        }
        /**TODO: user flags are not deleted this way: store them all in a single column 
         * and replace that column full.
         */
        // user flags
        for (String flag : flags.getUserFlags()) {
            put.add(MESSAGES_META_CF, userFlagToBytes(flag), MARKER_PRESENT);
        }
        return put;
    }

    public static Delete flagsToDelete(Message<UUID> message, Flags flags) {
        Delete delete = new Delete(messageRowKey(message));
        //we mark for delete flags that are not present (they will be Put'ed)
        if (flags.contains(Flag.ANSWERED)) {
            delete.deleteColumn(MESSAGES_META_CF, FLAGS_ANSWERED);
        }
        if (flags.contains(Flag.DELETED)) {
            delete.deleteColumn(MESSAGES_META_CF, FLAGS_DELETED);
        }
        if (flags.contains(Flag.DRAFT)) {
            delete.deleteColumn(MESSAGES_META_CF, FLAGS_DRAFT);
        }
        if (flags.contains(Flag.FLAGGED)) {
            delete.deleteColumn(MESSAGES_META_CF, FLAGS_FLAGGED);
        }
        if (flags.contains(Flag.RECENT)) {
            delete.deleteColumn(MESSAGES_META_CF, FLAGS_RECENT);
        }
        if (flags.contains(Flag.SEEN)) {
            delete.deleteColumn(MESSAGES_META_CF, FLAGS_SEEN);
        }
        if (flags.contains(Flag.USER)) {
            delete.deleteColumn(MESSAGES_META_CF, FLAGS_USER);
        }

        // we delete all user flags that where not in the new configuration
        for (String flag : flags.getUserFlags()) {
            delete.deleteColumn(MESSAGES_META_CF, userFlagToBytes(flag));
        }
        return delete;
    }

    /**
     * Returns a String composed of all flags in the  parameter.
     * @param flags
     * @return a string representation of all flags
     */
    public static String flagsToString(Flags flags) {
        StringBuilder b = new StringBuilder();

        if (flags.contains(Flag.ANSWERED)) {
            b.append("ANSWERED ");
        }
        if (flags.contains(Flag.DELETED)) {
            b.append("DELETED ");
        }
        if (flags.contains(Flag.DRAFT)) {
            b.append("DRAFT ");
        }
        if (flags.contains(Flag.FLAGGED)) {
            b.append("FLAGGED ");
        }
        if (flags.contains(Flag.RECENT)) {
            b.append("RECENT ");
        }
        if (flags.contains(Flag.SEEN)) {
            b.append("SEEN ");
        }
        if (flags.contains(Flag.USER)) {
            b.append("USER ");
        }
        for (String flag : flags.getUserFlags()) {
            b.append(flag);
            b.append(" ");
        }
        return b.toString();
    }
}
TOP

Related Classes of org.apache.james.mailbox.hbase.HBaseUtils

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.