Package org.apache.phoenix.join

Source Code of org.apache.phoenix.join.HashJoinInfo

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.join;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.SchemaUtil;

public class HashJoinInfo {
    private static final String HASH_JOIN = "HashJoin";

    private KeyValueSchema joinedSchema;
    private ImmutableBytesPtr[] joinIds;
    private List<Expression>[] joinExpressions;
    private JoinType[] joinTypes;
    private boolean[] earlyEvaluation;
    private KeyValueSchema[] schemas;
    private int[] fieldPositions;
    private Expression postJoinFilterExpression;
    private Integer limit;
    private boolean forceProjection;

    public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection) {
      this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression, limit, forceProjection);
    }

    private static KeyValueSchema[] buildSchemas(PTable[] tables) {
      KeyValueSchema[] schemas = new KeyValueSchema[tables.length];
      for (int i = 0; i < tables.length; i++) {
        schemas[i] = buildSchema(tables[i]);
      }
      return schemas;
    }

    private static KeyValueSchema buildSchema(PTable table) {
      KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
      if (table != null) {
          for (PColumn column : table.getColumns()) {
              if (!SchemaUtil.isPKColumn(column)) {
                  builder.addField(column);
              }
          }
      }
        return builder.build();
    }

    private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression, Integer limit, boolean forceProjection) {
      this.joinedSchema = joinedSchema;
      this.joinIds = joinIds;
        this.joinExpressions = joinExpressions;
        this.joinTypes = joinTypes;
        this.earlyEvaluation = earlyEvaluation;
        this.schemas = schemas;
        this.fieldPositions = fieldPositions;
        this.postJoinFilterExpression = postJoinFilterExpression;
        this.limit = limit;
        this.forceProjection = forceProjection;
    }

    public KeyValueSchema getJoinedSchema() {
      return joinedSchema;
    }

    public ImmutableBytesPtr[] getJoinIds() {
        return joinIds;
    }

    public List<Expression>[] getJoinExpressions() {
        return joinExpressions;
    }

    public JoinType[] getJoinTypes() {
        return joinTypes;
    }

    public boolean[] earlyEvaluation() {
      return earlyEvaluation;
    }

    public KeyValueSchema[] getSchemas() {
      return schemas;
    }

    public int[] getFieldPositions() {
      return fieldPositions;
    }

    public Expression getPostJoinFilterExpression() {
        return postJoinFilterExpression;
    }

    public Integer getLimit() {
        return limit;
    }

    /*
     * If the LHS table is a sub-select, we always do projection, since
     * the ON expressions reference only projected columns.
     */
    public boolean forceProjection() {
        return forceProjection;
    }

    public static void serializeHashJoinIntoScan(Scan scan, HashJoinInfo joinInfo) {
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        try {
            DataOutputStream output = new DataOutputStream(stream);
            joinInfo.joinedSchema.write(output);
            int count = joinInfo.joinIds.length;
            WritableUtils.writeVInt(output, count);
            for (int i = 0; i < count; i++) {
                joinInfo.joinIds[i].write(output);
                WritableUtils.writeVInt(output, joinInfo.joinExpressions[i].size());
                for (Expression expr : joinInfo.joinExpressions[i]) {
                    WritableUtils.writeVInt(output, ExpressionType.valueOf(expr).ordinal());
                    expr.write(output);
                }
                WritableUtils.writeVInt(output, joinInfo.joinTypes[i].ordinal());
                output.writeBoolean(joinInfo.earlyEvaluation[i]);
                joinInfo.schemas[i].write(output);
                WritableUtils.writeVInt(output, joinInfo.fieldPositions[i]);
            }
            if (joinInfo.postJoinFilterExpression != null) {
                WritableUtils.writeVInt(output, ExpressionType.valueOf(joinInfo.postJoinFilterExpression).ordinal());
                joinInfo.postJoinFilterExpression.write(output);
            } else {
                WritableUtils.writeVInt(output, -1);
            }
            WritableUtils.writeVInt(output, joinInfo.limit == null ? -1 : joinInfo.limit);
            output.writeBoolean(joinInfo.forceProjection);
            scan.setAttribute(HASH_JOIN, stream.toByteArray());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                stream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

    }

    @SuppressWarnings("unchecked")
    public static HashJoinInfo deserializeHashJoinFromScan(Scan scan) {
        byte[] join = scan.getAttribute(HASH_JOIN);
        if (join == null) {
            return null;
        }
        ByteArrayInputStream stream = new ByteArrayInputStream(join);
        try {
            DataInputStream input = new DataInputStream(stream);
            KeyValueSchema joinedSchema = new KeyValueSchema();
            joinedSchema.readFields(input);
            int count = WritableUtils.readVInt(input);
            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
            List<Expression>[] joinExpressions = new List[count];
            JoinType[] joinTypes = new JoinType[count];
            boolean[] earlyEvaluation = new boolean[count];
            KeyValueSchema[] schemas = new KeyValueSchema[count];
            int[] fieldPositions = new int[count];
            for (int i = 0; i < count; i++) {
                joinIds[i] = new ImmutableBytesPtr();
                joinIds[i].readFields(input);
                int nExprs = WritableUtils.readVInt(input);
                joinExpressions[i] = new ArrayList<Expression>(nExprs);
                for (int j = 0; j < nExprs; j++) {
                    int expressionOrdinal = WritableUtils.readVInt(input);
                    Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
                    expression.readFields(input);
                    joinExpressions[i].add(expression);
                }
                int type = WritableUtils.readVInt(input);
                joinTypes[i] = JoinType.values()[type];
                earlyEvaluation[i] = input.readBoolean();
                schemas[i] = new KeyValueSchema();
                schemas[i].readFields(input);
                fieldPositions[i] = WritableUtils.readVInt(input);
            }
            Expression postJoinFilterExpression = null;
            int expressionOrdinal = WritableUtils.readVInt(input);
            if (expressionOrdinal != -1) {
                postJoinFilterExpression = ExpressionType.values()[expressionOrdinal].newInstance();
                postJoinFilterExpression.readFields(input);
            }
            int limit = -1;
            boolean forceProjection = false;
            // Read these and ignore if we don't find them as they were not
            // present in Apache Phoenix 3.0.0 release. This allows a newer
            // 3.1 server to work with an older 3.0 client without force
            // both to be upgraded in lock step.
            try {
                limit = WritableUtils.readVInt(input);
                forceProjection = input.readBoolean();
            } catch (EOFException ignore) {
            }
            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression, limit >= 0 ? limit : null, forceProjection);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                stream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
TOP

Related Classes of org.apache.phoenix.join.HashJoinInfo

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.