Package com.englishtown.vertx

Source Code of com.englishtown.vertx.CassandraBinaryStore

/*
* The MIT License (MIT)
* Copyright © 2013 Englishtown <opensource@englishtown.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the “Software”), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.englishtown.vertx;

import com.codahale.metrics.Counter;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.englishtown.vertx.cassandra.binarystore.*;
import com.englishtown.vertx.cassandra.binarystore.impl.DefaultChunkInfo;
import com.englishtown.vertx.cassandra.binarystore.impl.DefaultFileInfo;
import com.google.common.util.concurrent.FutureCallback;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Future;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.platform.Verticle;

import javax.inject.Inject;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;

import static com.codahale.metrics.MetricRegistry.name;

/**
* An EventBus module to save binary files in Cassandra
*/
public class CassandraBinaryStore extends Verticle implements Handler<Message<JsonObject>> {

    public static final String DEFAULT_ADDRESS = "et.cassandra.binarystore";

    private final BinaryStoreStarter starter;
    private final BinaryStoreManager binaryStoreManager;

    protected EventBus eb;
    protected Logger logger;

    private String address;

    private final MetricRegistry registry;
    private JmxReporter reporter;
    private Counter messageCounter;
    private Counter errorCounter;

    @Inject
    public CassandraBinaryStore(BinaryStoreStarter starter, BinaryStoreManager binaryStoreManager, MetricRegistry registry) {
        this.starter = starter;
        this.binaryStoreManager = binaryStoreManager;
        this.registry = registry;
    }

    @Override
    public void start(final Future<Void> startedResult) {

        logger = container.logger();
        messageCounter = registry.counter(name(Metrics.BASE_NAME, "eb.messages"));
        errorCounter = registry.counter(name(Metrics.BASE_NAME, "eb.errors"));

        starter.run(new Handler<AsyncResult<Void>>() {
            @Override
            public void handle(AsyncResult<Void> event) {
                if (event.succeeded()) {
                    init(startedResult);
                } else {
                    startedResult.setFailure(event.cause());
                }
            }
        });

    }

    private void init(final Future<Void> startedResult) {

        eb = vertx.eventBus();

        JsonObject config = container.config();
        address = config.getString("address", DEFAULT_ADDRESS);

        // Main Message<JsonObject> handler that inspects an "action" field
        eb.registerHandler(address, this);

        // Message<Buffer> handler to save file chunks
        eb.registerHandler(address + "/saveChunk", new Handler<Message<Buffer>>() {
            @Override
            public void handle(Message<Buffer> message) {
                messageCounter.inc();
                saveChunk(message);
            }
        });

        // Start jmx metric reporter if enabled
        Map<String, Object> values = new HashMap<>();
        values.put("address", address);
        reporter = com.englishtown.vertx.metrics.Utils.create(this, registry, values, config);

        startedResult.setResult(null);

    }

    @Override
    public void stop() {
        if (reporter != null) {
            reporter.stop();
        }
    }

    @Override
    public void handle(Message<JsonObject> message) {

        messageCounter.inc();
        JsonObject jsonObject = message.body();
        String action = getRequiredString("action", message, jsonObject);
        if (action == null) {
            return;
        }

        try {
            switch (action) {
                case "getFile":
                    getFile(message, jsonObject);
                    break;
                case "getChunk":
                    getChunk(message, jsonObject);
                    break;
                case "saveFile":
                    saveFile(message, jsonObject);
                    break;
                default:
                    sendError(message, "action " + action + " is not supported");
            }

        } catch (Throwable e) {
            sendError(message, "Unexpected error in " + action + ": " + e.getMessage(), e);
        }
    }

    public void saveFile(final Message<JsonObject> message, JsonObject jsonObject) {

        UUID id = getUUID("id", message, jsonObject);
        if (id == null) {
            return;
        }

        Long length = getRequiredLong("length", message, jsonObject, 1);
        if (length == null) {
            return;
        }

        Integer chunkSize = getRequiredInt("chunkSize", message, jsonObject, 1);
        if (chunkSize == null) {
            return;
        }

        long uploadDate = jsonObject.getLong("uploadDate", 0);
        if (uploadDate <= 0) {
            uploadDate = System.currentTimeMillis();
        }

        String filename = jsonObject.getString("filename");
        String contentType = jsonObject.getString("contentType");
        JsonObject metadata = jsonObject.getObject("metadata");

        DefaultFileInfo fileInfo = new DefaultFileInfo()
                .setId(id)
                .setLength(length)
                .setChunkSize(chunkSize)
                .setUploadDate(uploadDate)
                .setFileName(filename)
                .setContentType(contentType);

        if (metadata != null) {
            Map<String, String> map = new LinkedHashMap<>();
            for (Map.Entry<String, Object> entry : metadata.toMap().entrySet()) {
                map.put(entry.getKey(), (entry.getValue() == null ? null : String.valueOf(entry.getValue())));
            }
            fileInfo.setMetadata(map);
        }

        binaryStoreManager.storeFile(fileInfo, new FutureCallback<Void>() {
            @Override
            public void onSuccess(Void result) {
                sendOK(message);
            }

            @Override
            public void onFailure(Throwable t) {
                sendError(message, "Error saving file", t);
            }
        });

    }

    /**
     * Handler for saving file chunks.
     *
     * @param message The message body is a Buffer where the first four bytes are an int indicating how many bytes are
     *                the json fields, the remaining bytes are the file chunk to write to Cassandra
     */
    public void saveChunk(Message<Buffer> message) {
        JsonObject jsonObject;
        byte[] data;

        // Parse the byte[] message body
        try {
            Buffer body = message.body();
            if (body.length() == 0) {
                sendError(message, "message body is empty");
                return;
            }

            // First four bytes indicate the json string length
            int len = body.getInt(0);

            // Decode json
            int from = 4;
            byte[] jsonBytes = body.getBytes(from, from + len);
            jsonObject = new JsonObject(decode(jsonBytes));

            // Remaining bytes are the chunk to be written
            from += len;
            data = body.getBytes(from, body.length());

        } catch (RuntimeException e) {
            sendError(message, "error parsing buffer message.  see the documentation for the correct format", e);
            return;
        }

        // Now save the chunk
        saveChunk(message, jsonObject, data);

    }

    public void saveChunk(final Message<Buffer> message, JsonObject jsonObject, byte[] data) {

        if (data == null || data.length == 0) {
            sendError(message, "chunk data is missing");
            return;
        }

        UUID id = getUUID("files_id", message, jsonObject);
        if (id == null) {
            return;
        }

        Integer n = getRequiredInt("n", message, jsonObject, 0);
        if (n == null) {
            return;
        }

        ChunkInfo chunkInfo = new DefaultChunkInfo()
                .setId(id)
                .setNum(n)
                .setData(data);

        binaryStoreManager.storeChunk(chunkInfo, new FutureCallback<Void>() {
            @Override
            public void onSuccess(Void result) {
                sendOK(message);
            }

            @Override
            public void onFailure(Throwable t) {
                sendError(message, "Error saving chunk", t);
            }
        });

    }

    public void getFile(final Message<JsonObject> message, JsonObject jsonObject) {

        final UUID id = getUUID("id", message, jsonObject);
        if (id == null) {
            return;
        }

        binaryStoreManager.loadFile(id, new FutureCallback<FileInfo>() {
            @Override
            public void onSuccess(FileInfo result) {
                if (result == null) {
                    sendError(message, "File " + id.toString() + " does not exist", 404);
                    return;
                }

                JsonObject fileInfo = new JsonObject()
                        .putString("filename", result.getFileName())
                        .putString("contentType", result.getContentType())
                        .putNumber("length", result.getLength())
                        .putNumber("chunkSize", result.getChunkSize())
                        .putNumber("uploadDate", result.getUploadDate());

                if (result.getMetadata() != null) {
                    JsonObject metadata = new JsonObject();
                    for (Map.Entry<String, String> entry : result.getMetadata().entrySet()) {
                        metadata.putString(entry.getKey(), entry.getValue());
                    }
                    fileInfo.putObject("metadata", metadata);
                }

                // Send file info
                sendOK(message, fileInfo);
            }

            @Override
            public void onFailure(Throwable t) {
                sendError(message, "Error reading file", t);
            }
        });

    }

    public void getChunk(final Message<JsonObject> message, final JsonObject jsonObject) {

        UUID id = getUUID("files_id", message, jsonObject);

        Integer n = getRequiredInt("n", message, jsonObject, 0);
        if (n == null) {
            return;
        }

        binaryStoreManager.loadChunk(id, n, new FutureCallback<ChunkInfo>() {
            @Override
            public void onSuccess(ChunkInfo result) {
                if (result == null) {
                    message.reply(new byte[0]);
                    return;
                }

                boolean reply = jsonObject.getBoolean("reply", false);
                Handler<Message<JsonObject>> replyHandler = null;

                if (reply) {
                    replyHandler = new Handler<Message<JsonObject>>() {
                        @Override
                        public void handle(Message<JsonObject> reply) {
                            int n = jsonObject.getInteger("n") + 1;
                            jsonObject.putNumber("n", n);
                            getChunk(reply, jsonObject);
                        }
                    };
                }

                // TODO: Change to reply with a Buffer instead of a byte[]?
                message.reply(result.getData(), replyHandler);
            }

            @Override
            public void onFailure(Throwable t) {
                sendError(message, "Error reading chunk", t);
            }
        });

    }

    public <T> void sendError(Message<T> message, String error) {
        sendError(message, error, null, null);
    }

    public <T> void sendError(Message<T> message, String error, Throwable e) {
        sendError(message, error, null, e);
    }

    public <T> void sendError(Message<T> message, String error, Integer reason) {
        sendError(message, error, reason, null);
    }

    public <T> void sendError(Message<T> message, String error, Integer reason, Throwable e) {
        errorCounter.inc();
        logger.error(error, e);
        JsonObject result = new JsonObject().putString("status", "error").putString("message", error);
        if (reason != null) {
            result.putNumber("reason", reason);
        }

        message.reply(result);
    }

    public <T> void sendOK(Message<T> message) {
        sendOK(message, new JsonObject());
    }

    public <T> void sendOK(Message<T> message, JsonObject response) {
        response.putString("status", "ok");
        message.reply(response);
    }

    private String decode(byte[] bytes) {
        try {
            return new String(bytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            // Should never happen
            throw new RuntimeException(e);
        }
    }

    private <T> UUID getUUID(String fieldName, Message<T> message, JsonObject jsonObject) {
        String id = getRequiredString(fieldName, message, jsonObject);
        if (id == null) {
            return null;
        }
        try {
            return UUID.fromString(id);
        } catch (IllegalArgumentException e) {
            sendError(message, fieldName + " " + id + " is not a valid UUID", e);
            return null;
        }
    }

    private <T> String getRequiredString(String fieldName, Message<T> message, JsonObject jsonObject) {
        String value = jsonObject.getString(fieldName);
        if (value == null) {
            sendError(message, fieldName + " must be specified");
        }
        return value;
    }

    private <T> Integer getRequiredInt(String fieldName, Message<T> message, JsonObject jsonObject, int minValue) {
        Integer value = jsonObject.getInteger(fieldName);
        if (value == null) {
            sendError(message, fieldName + " must be specified");
            return null;
        }
        if (value < minValue) {
            sendError(message, fieldName + " must be greater than or equal to " + minValue);
            return null;
        }
        return value;
    }

    private <T> Long getRequiredLong(String fieldName, Message<T> message, JsonObject jsonObject, long minValue) {
        Long value = jsonObject.getLong(fieldName);
        if (value == null) {
            sendError(message, fieldName + " must be specified");
            return null;
        }
        if (value < minValue) {
            sendError(message, fieldName + " must be greater than or equal to " + minValue);
            return null;
        }
        return value;
    }
}
TOP

Related Classes of com.englishtown.vertx.CassandraBinaryStore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.