/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.aggregation;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.metadata.Signature;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.AbstractFixedWidthType;
import com.facebook.presto.spi.type.StandardTypes;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.type.ArrayType;
import com.facebook.presto.type.MapType;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Set;
import static com.facebook.presto.block.BlockAssertions.createDoublesBlock;
import static com.facebook.presto.block.BlockAssertions.createStringsBlock;
import static com.facebook.presto.operator.aggregation.AggregationTestUtils.assertAggregation;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.instanceOf;
import static com.google.common.base.Predicates.not;
import static io.airlift.slice.SizeOf.SIZE_OF_DOUBLE;
import static org.testng.Assert.assertNotNull;
public class TestMaxByAggregation
{
private static final MetadataManager metadata = new MetadataManager();
@BeforeClass
public void setup()
{
((TypeRegistry) metadata.getTypeManager()).addType(CustomDoubleType.CUSTOM_DOUBLE);
}
@Test
public void testAllRegistered()
{
Set<Type> orderableTypes = FluentIterable.from(metadata.getTypeManager().getTypes()).filter(new Predicate<Type>()
{
@Override
public boolean apply(Type input)
{
return input.isOrderable();
}
}).toSet();
// TODO: Include these in the test once MAX_BY works with parametric types
List<Type> valueTypes = FluentIterable.from(metadata.getTypeManager().getTypes()).filter(and(not(instanceOf(ArrayType.class)), not(instanceOf(MapType.class)))).toList();
for (Type keyType : orderableTypes) {
for (Type valueType : valueTypes) {
assertNotNull(metadata.getExactFunction(new Signature("max_by", valueType.getName(), valueType.getName(), keyType.getName())));
}
}
}
@Test
public void testNull()
throws Exception
{
InternalAggregationFunction doubleDouble = metadata.getExactFunction(new Signature("max_by", StandardTypes.DOUBLE, StandardTypes.DOUBLE, StandardTypes.DOUBLE)).getAggregationFunction();
assertAggregation(
doubleDouble,
1.0,
null,
createPage(
new Double[] {1.0, null},
new Double[] {1.0, 2.0}));
}
@Test
public void testDoubleDouble()
throws Exception
{
InternalAggregationFunction doubleDouble = metadata.getExactFunction(new Signature("max_by", StandardTypes.DOUBLE, StandardTypes.DOUBLE, StandardTypes.DOUBLE)).getAggregationFunction();
assertAggregation(
doubleDouble,
1.0,
null,
createPage(
new Double[] {null},
new Double[] {null}),
createPage(
new Double[] {null},
new Double[] {null}));
assertAggregation(
doubleDouble,
1.0,
2.0,
createPage(
new Double[] {3.0, 2.0},
new Double[] {1.0, 1.5}),
createPage(
new Double[] {null},
new Double[] {null}));
}
@Test
public void testDoubleVarchar()
throws Exception
{
InternalAggregationFunction doubleVarchar = metadata.getExactFunction(new Signature("max_by", StandardTypes.VARCHAR, StandardTypes.VARCHAR, StandardTypes.DOUBLE)).getAggregationFunction();
assertAggregation(
doubleVarchar,
1.0,
"a",
createPage(
new String[] {"z", "a"},
new Double[] {1.0, 2.0}),
createPage(
new String[] {null},
new Double[] {null}));
assertAggregation(
doubleVarchar,
1.0,
"hi",
createPage(
new String[] {"zz", "hi"},
new Double[] {0.0, 1.0}),
createPage(
new String[] {null, "a"},
new Double[] {null, -1.0}));
}
private static Page createPage(Double[] values, Double[] keys)
{
return new Page(createDoublesBlock(values), createDoublesBlock(keys));
}
private static Page createPage(String[] values, Double[] keys)
{
return new Page(createStringsBlock(values), createDoublesBlock(keys));
}
private static class CustomDoubleType
extends AbstractFixedWidthType
{
public static final CustomDoubleType CUSTOM_DOUBLE = new CustomDoubleType();
public static final String NAME = "custom_double";
private CustomDoubleType()
{
super(NAME, double.class, SIZE_OF_DOUBLE);
}
@Override
public boolean isComparable()
{
return true;
}
@Override
public boolean isOrderable()
{
return true;
}
@Override
public Object getObjectValue(ConnectorSession session, Block block, int position)
{
if (block.isNull(position)) {
return null;
}
return block.getDouble(position, 0);
}
@Override
public boolean equalTo(Block leftBlock, int leftPosition, Block rightBlock, int rightPosition)
{
long leftValue = leftBlock.getLong(leftPosition, 0);
long rightValue = rightBlock.getLong(rightPosition, 0);
return leftValue == rightValue;
}
@Override
public int hash(Block block, int position)
{
long value = block.getLong(position, 0);
return (int) (value ^ (value >>> 32));
}
@Override
public int compareTo(Block leftBlock, int leftPosition, Block rightBlock, int rightPosition)
{
double leftValue = leftBlock.getDouble(leftPosition, 0);
double rightValue = rightBlock.getDouble(rightPosition, 0);
return Double.compare(leftValue, rightValue);
}
@Override
public void appendTo(Block block, int position, BlockBuilder blockBuilder)
{
if (block.isNull(position)) {
blockBuilder.appendNull();
}
else {
blockBuilder.writeDouble(block.getDouble(position, 0)).closeEntry();
}
}
@Override
public double getDouble(Block block, int position)
{
return block.getDouble(position, 0);
}
@Override
public void writeDouble(BlockBuilder blockBuilder, double value)
{
blockBuilder.writeDouble(value).closeEntry();
}
}
}