Package org.apache.drill.exec.physical.impl.window

Source Code of org.apache.drill.exec.physical.impl.window.StreamingWindowFrameRecordBatch

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.physical.impl.window;

import com.google.common.collect.Lists;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JVar;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;

import java.io.IOException;
import java.util.List;

public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<WindowPOP> {
  private StreamingWindowFramer framer;

  public StreamingWindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
    super(popConfig, context, incoming);
  }

  @Override
  protected boolean setupNewSchema() throws SchemaChangeException {
    container.clear();

    try {
      this.framer = createFramer();
    } catch (ClassTransformationException | IOException ex) {
      throw new SchemaChangeException("Failed to create framer: " + ex);
    }
    if (container.isSchemaChanged()) {
      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
      return true;
    }
    return false;
  }

  private void getIndex(ClassGenerator<StreamingWindowFramer> g) {
    switch (incoming.getSchema().getSelectionVectorMode()) {
      case FOUR_BYTE: {
        JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class));
        g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
        g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
        return;
      }
      case NONE: {
        g.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex"));
        return;
      }
      case TWO_BYTE: {
        JVar var = g.declareClassField("sv2_", g.getModel()._ref(SelectionVector2.class));
        g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2"));
        g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
        return;
      }

      default:
        throw new IllegalStateException();
    }
  }

  private StreamingWindowFramer createFramer() throws SchemaChangeException, IOException, ClassTransformationException {
    int configLength = popConfig.getAggregations().length;
    List<LogicalExpression> valueExprs = Lists.newArrayList();
    LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length];

    ErrorCollector collector = new ErrorCollectorImpl();

    for (int i = 0; i < configLength; i++) {
      NamedExpression ne = popConfig.getAggregations()[i];
      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
      if (expr == null) {
        continue;
      }

      final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
      container.addOrGet(outputField);
      TypedFieldId id = container.getValueVectorId(outputField.getPath());
      assert id != null : "Got null TypedFieldId";
      valueExprs.add(new ValueVectorWriteExpression(id, expr, true));
    }

    int j = 0;
    LogicalExpression[] windowExprs = new LogicalExpression[incoming.getSchema().getFieldCount()];
    // TODO: Should transfer all existing columns instead of copy. Currently this is not easily doable because
    // we are not processing one entire batch in one iteration, so cannot simply transfer.
    for (VectorWrapper wrapper : incoming) {
      ValueVector vv = wrapper.isHyper() ? wrapper.getValueVectors()[0] : wrapper.getValueVector();
      container.addOrGet(vv.getField());
      TypedFieldId id = container.getValueVectorId(vv.getField().getPath());
      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(
          new ValueVectorReadExpression(new TypedFieldId(vv.getField().getType(), wrapper.isHyper(), j)),
          incoming,
          collector,
          context.getFunctionRegistry());
      windowExprs[j] = new ValueVectorWriteExpression(id, expr, true);
      j++;
    }

    for (int i = 0; i < keyExprs.length; i++) {
      NamedExpression ne = popConfig.getWithins()[i];

      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
      if (expr == null) {
        continue;
      }

      keyExprs[i] = expr;
    }

    if (collector.hasErrors()) {
      throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
    }

    final ClassGenerator<StreamingWindowFramer> cg = CodeGenerator.getRoot(StreamingWindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
    setupIsSame(cg, keyExprs);
    setupIsSameFromBatch(cg, keyExprs);
    addRecordValues(cg, valueExprs.toArray(new LogicalExpression[valueExprs.size()]));
    outputWindowValues(cg, windowExprs);

    cg.getBlock("resetValues")._return(JExpr.TRUE);

    getIndex(cg);
    StreamingWindowFramer agg = context.getImplementationClass(cg);
    agg.setup(
        context,
        incoming,
        this,
        StreamingWindowFrameTemplate.UNBOUNDED, StreamingWindowFrameTemplate.CURRENT_ROW
    );
    return agg;
  }

  private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSameFromBatch", "isSameFromBatch", null, null); // the internal batch changes each time so we need to redo setup.
  private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSameFromBatch", null, null);
  private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
  private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);

  private void setupIsSameFromBatch(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) {
    cg.setMappingSet(ISA_B1);
    for (LogicalExpression expr : keyExprs) {
      // first, we rewrite the evaluation stack for each side of the comparison.
      cg.setMappingSet(ISA_B1);
      ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
      cg.setMappingSet(ISA_B2);
      ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);

      LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
      ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
      cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
    }
    cg.getEvalBlock()._return(JExpr.TRUE);
  }

  private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
  private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
  private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);

  private void setupIsSame(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) {
    cg.setMappingSet(IS_SAME_I1);
    for (LogicalExpression expr : keyExprs) {
      // first, we rewrite the evaluation stack for each side of the comparison.
      cg.setMappingSet(IS_SAME_I1);
      ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
      cg.setMappingSet(IS_SAME_I2);
      ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);

      LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
      ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
      cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
    }
    cg.getEvalBlock()._return(JExpr.TRUE);
  }

  private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null);
  private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
  private final MappingSet EVAL = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);

  private void addRecordValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) {
    cg.setMappingSet(EVAL);
    for (LogicalExpression ex : valueExprs) {
      ClassGenerator.HoldingContainer hc = cg.addExpr(ex);
      cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
    }
    cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE);
  }

  private final GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupInterior", "outputWindowValues", null, null);
  private final MappingSet WINDOW_VALUES = new MappingSet("inIndex" /* read index */, "outIndex" /* write index */, "incoming", "outgoing", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES);

  private void outputWindowValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) {
    cg.setMappingSet(WINDOW_VALUES);
    for (int i = 0; i < valueExprs.length; i++) {
      ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]);
      cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
    }
    cg.getEvalBlock()._return(JExpr.TRUE);
  }

  @Override
  protected IterOutcome doWork() {
    StreamingWindowFramer.AggOutcome out = framer.doWork();

    while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) {
      framer = null;
      try {
        setupNewSchema();
      } catch (SchemaChangeException e) {
        return IterOutcome.STOP;
      }
      out = framer.doWork();
    }

    if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) {
      done = true;
    }

    return framer.getOutcome();
  }

  @Override
  public int getRecordCount() {
    return framer.getOutputCount();
  }

  @Override
  public void cleanup() {
    if (framer != null) {
      framer.cleanup();
    }
    super.cleanup();
  }
}
TOP

Related Classes of org.apache.drill.exec.physical.impl.window.StreamingWindowFrameRecordBatch

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.