Package krati.store.bus.protocol

Source Code of krati.store.bus.protocol.StoreBusResponder

/*
* Copyright (c) 2010-2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package krati.store.bus.protocol;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import krati.retention.Event;
import krati.retention.Position;
import krati.retention.clock.Clock;
import krati.store.avro.protocol.ProtocolConstants;
import krati.store.avro.protocol.Protocols;
import krati.store.avro.protocol.StoreResponder;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericResponder;
import org.apache.avro.util.Utf8;
import org.apache.log4j.Logger;

/**
* StoreBusResponder
*
* @author jwu
* @since 10/04, 2011
*/
public class StoreBusResponder extends GenericResponder {
    private final static Logger _logger = Logger.getLogger(StoreResponder.class);
    private final Properties _properties = new Properties();
    private final StoreBusProtocolHandler _handler;
   
    public StoreBusResponder(StoreBusProtocolHandler handler) {
        super(Protocols.getProtocol());
        this._handler = handler;
    }
   
    public final StoreBusProtocolHandler getHandler() {
        return _handler;
    }
   
    public final Properties getProperties() {
        return _properties;
    }
   
    public final String getProperty(String key) {
        return _properties.getProperty(key);
    }
   
    public final String getProperty(String key, String defaultValue) {
        return _properties.getProperty(key, defaultValue);
    }
   
    public final Object setProperty(String key, String value) {
        return _properties.setProperty(key, value);
    }
   
    @Override @SuppressWarnings("unchecked")
    public Object respond(Protocol.Message message, Object request) {
        GenericRecord record = (GenericRecord)request;
        String msgName = message.getName();

        if (msgName.equals(ProtocolConstants.MSG_SYNC)) {
            String positionStr = record.get("position").toString();
            Position position = _handler.getPosition(positionStr);
           
            // Check if clock value needs to be transferred
            Utf8 opt = (Utf8)record.get("opt");
            boolean clockNotNeeded = (opt == null) ? false : opt.toString().equals(StoreBusOptions.OPT_NO_CLOCK);
           
            Map<byte[], Event<byte[]>> map = new HashMap<byte[], Event<byte[]>>(1024);
            Position nextPosition = _handler.syncUp(position, map);
           
            Schema schemaKVC = getLocal().getType(ProtocolConstants.TYPE_KeyValueClock);
            Schema schema1 = getLocal().getType(ProtocolConstants.TYPE_SyncResultSet);
            Schema schema2 = schema1.getField("results").schema();
            GenericArray<GenericRecord> array = new GenericData.Array<GenericRecord>(map.size(), schema2);
           
            for(Map.Entry<byte[], Event<byte[]>> e : map.entrySet()) {
                GenericRecord kvc = new GenericData.Record(schemaKVC);
                kvc.put("key", ByteBuffer.wrap(e.getKey()));
               
                byte[] value = e.getValue().getValue();
                kvc.put("value", value == null ? null : ByteBuffer.wrap(value));
               
                Clock clock = clockNotNeeded ? null : e.getValue().getClock();
                kvc.put("clock", clock == null ? null : ByteBuffer.wrap(clock.toByteArray()));
               
                array.add(kvc);
            }
           
            GenericRecord response = new GenericData.Record(schema1);
            response.put("results", array);
            response.put("position", new Utf8(nextPosition.toString()));
           
            return response;
        }
       
        if (msgName.equals(ProtocolConstants.MSG_GET)) {
            try {
                ByteBuffer key = (ByteBuffer)record.get("key");
                byte[] bytes = _handler.get(key.array());
                return bytes == null ? null : ByteBuffer.wrap(bytes);
            } catch(Exception e) {
                throw new AvroRuntimeException(e);
            }
        }
       
        if (msgName.equals(ProtocolConstants.MSG_MGET)) {
            GenericArray<ByteBuffer> keys = (GenericArray<ByteBuffer>)record.get("keys");
           
            Schema schemaKV = getLocal().getType(ProtocolConstants.TYPE_KeyValue);
            Schema schema = getLocal().getMessages().get(ProtocolConstants.MSG_MGET).getRequest().getField("keys").schema();
            GenericArray<GenericRecord> array = new GenericData.Array<GenericRecord>(keys.size(), schema);
           
            try {
                // Include only non-null key and non-null value in response
                for(ByteBuffer key : keys) {
                    if(key != null) {
                        byte[] bytes = _handler.get(key.array());
                        if(bytes != null) {
                             GenericRecord kv = new GenericData.Record(schemaKV);
                             kv.put("key", key);
                             kv.put("value", ByteBuffer.wrap(bytes));
                             array.add(kv);
                        }
                    }
                }
               
                return array;
            } catch(Exception e) {
                throw new AvroRuntimeException(e);
            }
        }
       
        if (msgName.equals(ProtocolConstants.MSG_META)) {
            Utf8 metaOpt = (Utf8)record.get("opt");
            Utf8 metaKey = (Utf8)record.get("key");
            Utf8 metaValue = (Utf8)record.get("value");
           
            String opt = metaOpt.toString();
            String key = (metaKey == null) ? null : metaKey.toString();
            String value = (metaValue == null) ? null : metaValue.toString();
           
            try {
                String ret = _handler.meta(opt, key, value);
                return ret == null ? null : new Utf8(ret);
            } catch(IllegalArgumentException e) {
                if(opt.equals(ProtocolConstants.OPT_SET_PROPERTY)) {
                    Object obj = _properties.setProperty(key, value);
                    return (obj == null) ? null : new Utf8(obj.toString());
                } else if(opt.equals(ProtocolConstants.OPT_DEL_PROPERTY)) {
                    Object obj = _properties.remove(key);
                    return (obj == null) ? null : new Utf8(obj.toString());
                } else if(opt.equals(ProtocolConstants.OPT_GET_PROPERTY)) {
                    if(key == null) {
                        return null;
                    }
                    String obj = _properties.getProperty(key);
                    return (obj == null) ? null : new Utf8(obj);
                } else {
                    return ProtocolConstants.NOP_UTF8;
                }
            } catch(Exception e) {
                _logger.error("Failed on meta: " + opt + " key=" + key + " value=" + value, e);
                return ProtocolConstants.ERR_UTF8;
            }
        }
       
        throw new AvroRuntimeException("Unexpected message: " + msgName);
    }
}
TOP

Related Classes of krati.store.bus.protocol.StoreBusResponder

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.