Package com.datasalt.pangool.io

Source Code of com.datasalt.pangool.io.Schema$Field$FieldDeserializer

/**
* Copyright [2012] [Datasalt Systems S.L.]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datasalt.pangool.io;

import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.avro.AvroRuntimeException;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serializer;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;

import com.datasalt.pangool.PangoolRuntimeException;

/**
* A list of {@link Field} elements that a {@link ITuple} instance contains.
*
*/
@SuppressWarnings("serial")
public class Schema implements Serializable {

  static final JsonFactory FACTORY = new JsonFactory();
  static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);

  static {
    FACTORY.enable(JsonParser.Feature.ALLOW_COMMENTS);
    FACTORY.setCodec(MAPPER);
  }

  /**
   * A field is an abstract data type that can be one of this:
   * <ul>
   * <li>A 32-bit signed <i>int</i>;
   * <li>A 64-bit signed <i>long</i>;
   * <li>A 32-bit IEEE single-<i>float</i>; or
   * <li>A 64-bit IEEE <i>double</i>-float; or
   * <li>A unicode <i>string</i>;
   * <li>A <i>boolean</i>; or
   * <li>An <i>enum</i>, containing one of a small set of symbols;
   * <li>A <i>byte buffer</i>
   * <li>An arbitrary <i>object</i>, serializable by Hadoop's serialization
   * </ul>
   *
   * A field can be constructed using one of its static <tt>createXXX</tt>
   * methods. A field object is <b>immutable</b>.
   */
  public static class Field implements Serializable{
    public static interface FieldSerializer<T> extends Serializer<T>{
      public void setProps(Map<String,String> properties);
    }
   
    public static interface FieldDeserializer<T> extends Deserializer<T>{
      public void setProps(Map<String,String> properties);
    }
    public static enum Type {
      INT, LONG, FLOAT, DOUBLE, STRING, BOOLEAN, ENUM, BYTES,OBJECT;
    }
   
    public static final Set<String> RESERVED_KEYWORDS;
    public static final String METADATA_OBJECT_CLASS="pangool.object.java-class";
    public static final String METADATA_OBJECT_SERIALIZER="pangool.object.java-serializer-class";
    public static final String METADATA_OBJECT_DESERIALIZER="pangool.object.java-deserializer-class";
    //this is to mark a BYTES Avro type as pangool OBJECT
    public static final String METADATA_BYTES_AS_OBJECT="pangool.object.mark";
   
    static {
      Set <String> reserved = new HashSet<String>();
      Collections.addAll(reserved,
          METADATA_OBJECT_CLASS,METADATA_OBJECT_SERIALIZER,
          METADATA_OBJECT_DESERIALIZER,METADATA_BYTES_AS_OBJECT);
      RESERVED_KEYWORDS = Collections.unmodifiableSet(reserved);
    }
   
    static final class Props extends LinkedHashMap<String,String> {
      private Set<String> reserved;
      public Props(Set<String> reserved) {
        super(1);
        this.reserved = reserved;
      }
      public void add(String name, String value) {
        if (reserved.contains(name))
          throw new AvroRuntimeException("Can't set reserved property: " + name);
       
        if (value == null)
          throw new AvroRuntimeException("Can't set a property to null: " + name);
     
        String old = get(name);
        if (old == null)
          put(name, value);
        else if (!old.equals(value))
          throw new AvroRuntimeException("Can't overwrite property: " + name);
      }

      public void write(JsonGenerator gen) throws IOException {
        for (Map.Entry<String,String> e : entrySet())
          gen.writeStringField(e.getKey(), e.getValue());
      }
    }
   

    private final String name;
    private final Type type;
   
    private final Props props= new Props(RESERVED_KEYWORDS);
   
    //special properties in props
    private Class<?> objectClass; //lazy loaded
    private Class<? extends FieldSerializer> serClass;
    private Class<? extends FieldDeserializer> deserClass;
   
    public void addProp(String key,String value){
      props.add(key, value);
    }
   
    public Map<String,String> getProps(){
      return Collections.unmodifiableMap(props);
    }
   
    public String getProp(String name){
      return props.get(name);
    }
   
   
    public static Field create(String name, Type type) {
      if(type == Type.ENUM) {
        throw new IllegalArgumentException(
            "Not allowed 'ENUM' type. Use 'Field.createEnum' method");
      } else if(type == Type.OBJECT) {
        throw new IllegalArgumentException(
            "Not allowed 'OBJECT' type. Use 'Field.createObject' method");
      }
      return new Field(name, type, null,null,null);
    }

    /**
     * Creates an <i>object</i> field.
     *
     * @param name
     *          Field's name
     * @param clazz
     *          Object's instance class
     * @return
     */
    public static Field createObject(String name, Class<?> clazz) {
      return new Field(name, Type.OBJECT, clazz,null,null);
    }
   
    public static Field createObject(String name,Class<? extends FieldSerializer> ser,Class<? extends FieldDeserializer> deser){
      return new Field(name,Type.OBJECT,null,ser,deser);
     
    }
   
    public static Field cloneField(Field field, String newName){
      Field result;
      switch(field.getType()){
      case OBJECT:
        if (field.getObjectClass() != null){
          result = Field.createObject(newName,field.getObjectClass());
        } else {
          result = Field.createObject(newName,field.getSerializerClass(),field.getDeserializerClass());
        }
         break;
      case ENUM:
        result =  Field.createEnum(newName,field.getObjectClass());
        break;
        default:
          result= Field.create(newName,field.getType());
      }
     
      for(Map.Entry<String,String> entry : field.getProps().entrySet()){
        if (!RESERVED_KEYWORDS.contains(entry.getKey())){
          result.addProp(entry.getKey(),entry.getValue());
        }
      }
      return result;
    }

    /**
     * Creates an enum field, based in a enum class
     *
     * @param name
     *          Field's name
     * @param clazz
     *          Enum class
     * @return
     */
    public static Field createEnum(String name, Class<?> clazz) {
      return new Field(name, Type.ENUM, clazz,null,null);
    }

    private Field(String name, Type type, Class<?> clazz,
        Class<? extends FieldSerializer> ser,Class<? extends FieldDeserializer> deser) {
      if(name == null) {
        throw new IllegalArgumentException("Field name can't be null");
      }
      if(type == null) {
        throw new IllegalArgumentException("Field type can't be null");
      }
     
      switch(type){
      case OBJECT:
        if (clazz == null && (ser == null || deser == null)){
          throw new IllegalArgumentException("Field needs specify non-null serializer and deserializer");
        }
        break;
      case ENUM:
        if(clazz == null) {
          throw new IllegalArgumentException("Enum field must specify enum class");
        }
        if(!clazz.isEnum()) {
          throw new IllegalArgumentException("Field with type " + type
              + " must specify an enum class.Use createEnum.");
        }
        break;
      }
      this.objectClass = clazz;
      this.name = name;
      this.type = type;
      this.serClass = ser;
      this.deserClass = deser;
    }

    public Type getType() {
      return type;
    }

    public String getName() {
      return name;
    }

    public Class<?> getObjectClass() {
      return objectClass;
    }
   
    public Class<? extends FieldSerializer> getSerializerClass(){
      return serClass;
    }
   
    public Class<? extends FieldDeserializer> getDeserializerClass(){
      return deserClass;
    }
   
    public boolean equals(Object a) {
      if(!(a instanceof Field)) {
        return false;
      }
      Field that = (Field) a;

      boolean t = name.equals(that.getName()) && type.equals(that.getType());
         
      if(type == Type.OBJECT || type == Type.ENUM) {
        t = t && objectClass.equals(that.getObjectClass());
      }
     
      if (serClass == null && that.serClass != null || serClass != null && that.serClass == null){
        return false;
      } else if (serClass != null && that.serClass != null){
        t = t && serClass.equals(that.serClass);
      }
     
      if (deserClass == null && that.deserClass != null || deserClass != null && that.deserClass == null){
        return false;
      } else if (deserClass != null && that.deserClass != null){
        t = t && deserClass.equals(that.deserClass);
      }
     
      if (props == null && that.props != null || props != null && that.props == null){
        return false;
      } else if (props != null && that.props != null){
        t = t && props.equals(that.props);
      }
      return t;
    }     

    @Override
    public int hashCode() {
      return name.hashCode(); //FIXME bad hash code
    }
   
    public String toString() {
      try {
        StringWriter writer = new StringWriter();
        JsonGenerator gen = FACTORY.createJsonGenerator(writer);
        toJson(gen);
        gen.flush();
        return writer.toString();
      } catch(IOException e) {
        throw new RuntimeException(e);
      }
    }

    static Field parse(JsonNode node) throws IOException {
      try {
        String name = node.get("name").getTextValue();
        String typeStr = node.get("type").getTextValue();
        Type type = Type.valueOf(typeStr);
       
        Map<String,String> properties =null;
        Field field;
        switch(type) {
        case OBJECT:
          JsonNode clazzNode = node.get("object_class");
          if (clazzNode == null){
            Class<? extends FieldSerializer> ser=null;
            if (node.get("serializer") != null){
              ser = (Class<? extends FieldSerializer>) Class.forName(node.get("serializer").getTextValue());
            }
            Class<? extends FieldDeserializer> deser=null;
            if (node.get("deserializer") != null){
              deser = (Class<? extends FieldDeserializer>) Class.forName(node.get("deserializer").getTextValue());
            }
            field =Field.createObject(name,ser,deser);
            } else {
              String clazz = clazzNode.getTextValue();
              field = Field.createObject(name,Class.forName(clazz));
            }
          break;
        case ENUM:
          String clazz = node.get("object_class").getTextValue();
          field =  Field.createEnum(name, Class.forName(clazz));
          break;
        default:
          field =  Field.create(name, type);
        }
        if (node.get("properties") != null){
          JsonNode propNode = node.get("properties");
          Iterator<String> fieldNames = propNode.getFieldNames();
          while (fieldNames.hasNext()){
            String key = fieldNames.next();
            field.addProp(key,propNode.get(key).getTextValue());
          }
        }
        return field;
      } catch(ClassNotFoundException e) {
        throw new IOException(e);
      }
    }
   
    void toJson(JsonGenerator gen) throws IOException {
      gen.writeStartObject();
      gen.writeStringField("name", getName());
      gen.writeStringField("type", getType().toString());
      if(getType() == Type.ENUM) {
        gen.writeStringField("object_class", getObjectClass().getName());
      } else if (getType() == Type.OBJECT){
        if (objectClass != null){
          gen.writeStringField("object_class", getObjectClass().getName());
        } else {
          gen.writeStringField("serializer",serClass.getName());
          gen.writeStringField("deserializer", deserClass.getName());
        }
      }
      if (props != null && !props.isEmpty()){
        gen.writeObjectFieldStart("properties");
        for(Map.Entry<String,String> entry : props.entrySet()){
          gen.writeStringField(entry.getKey(), entry.getValue());
        }
        gen.writeEndObject();
      }
      gen.writeEndObject();
    }
  }

  private final List<Field> fields;
  private final String name;

  private Map<String, Integer> indexByFieldName = new HashMap<String, Integer>();

  public Schema(String name, List<Field> fields) {
    if(name == null || name.isEmpty()) {
      throw new IllegalArgumentException("Name for schema can't be null");
    }
    this.name = name;
    this.fields = Collections.unmodifiableList(new ArrayList<Field>(fields));

    int index = 0;
    for(Field field : this.fields) {
      this.indexByFieldName.put(field.getName(), index);
      index++;
    }
  }

  public List<Field> getFields() {
    return fields;
  }

  public String getName() {
    return name;
  }

  public Integer getFieldPos(String fieldName) {
    return indexByFieldName.get(fieldName);
  }

  public Field getField(String fieldName) {
    Integer index = getFieldPos(fieldName);
    return (index == null) ? null : fields.get(index);
  }

  public Field getField(int i) {
    return fields.get(i);
  }

  public boolean containsField(String fieldName) {
    return indexByFieldName.containsKey(fieldName);
  }
 
 
 
  @Override
  public String toString() {
    return toString(true);
  }

  public String toString(boolean pretty) {
    try {
      StringWriter writer = new StringWriter();
      JsonGenerator gen = FACTORY.createJsonGenerator(writer);
      if(pretty)
        gen.useDefaultPrettyPrinter();
      toJson(gen);
      gen.flush();
      return writer.toString();
    } catch(IOException e) {
      throw new RuntimeException(e);
    }
  }

  public static Schema parse(String s) {
    try {
      return parse(FACTORY.createJsonParser(new StringReader(s)));
    } catch(IOException e) {
      throw new SchemaParseException(e);
    }
  }

  static Schema parse(JsonParser parser) throws IOException {
    return Schema.parse(MAPPER.readTree(parser));
  }

  public static Schema parse(JsonNode schema) throws IOException {
    String name = schema.get("name").getTextValue();
    List<Field> fields = new ArrayList<Field>();
    JsonNode fieldsNode = schema.get("fields");
    Iterator<JsonNode> fieldsNodes = fieldsNode.getElements();
    while(fieldsNodes.hasNext()) {
      JsonNode fieldNode = fieldsNodes.next();
      fields.add(Field.parse(fieldNode));
    }
    return new Schema(name, fields);
  }

  @Override
  public boolean equals(Object o) {
    if(o instanceof Schema) {
      return toString().equals(o.toString());
    }
    return false;
  }

  @Override
  public int hashCode() {
    return toString().hashCode();
  }
 
  public void toJson(JsonGenerator gen) throws IOException {
    gen.writeStartObject();
    gen.writeStringField("name", name);
    gen.writeFieldName("fields");
    fieldsToJson(gen);
    gen.writeEndObject();
  }

  private void fieldsToJson(JsonGenerator gen) throws IOException {
    gen.writeStartArray();
    for(Field f : fields) {
      f.toJson(gen);
    }
    gen.writeEndArray();
  }

  public static class SchemaParseException extends PangoolRuntimeException {
    public SchemaParseException(Throwable cause) {
      super(cause);
    }

    public SchemaParseException(String message) {
      super(message);
    }
  }
 
  /*
   * Methods to use field aliases
   *
   * TODO : this could be in a utils class
   */

  public static boolean containsFieldUsingAlias(Schema schema,String fieldName,Map<String,String> aliases){
    if (aliases == null){
      return schema.containsField(fieldName);
    } else {
    String ref = aliases.get(fieldName);
    return ref == null ? schema.containsField(fieldName) : schema.containsField(ref);
    }
  }
 
  public static Field getFieldUsingAliases(Schema schema,String field,Map<String,String> aliases){
    if (aliases == null){
      return schema.getField(field);
    } else {
      String ref = aliases.get(field);
      return ref == null ? schema.getField(field) : schema.getField(ref);
    }
  }
 
  public static int getFieldPosUsingAliases(Schema schema,String field,Map<String,String> aliases){
    if (aliases == null){
      return schema.getFieldPos(field);
    } else {
      String ref = aliases.get(field);
      return ref == null ? schema.getFieldPos(field) : schema.getFieldPos(ref);
    }
  }
 

}
TOP

Related Classes of com.datasalt.pangool.io.Schema$Field$FieldDeserializer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.