CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6)
.keyField(LongValue.class, 0, 0)
.name("CoGroup").input1(source1).input2(source2).build();
Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING);
Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING);
groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING);
coGroup.setGroupOrderForInputOne(groupOrder1);
coGroup.setGroupOrderForInputTwo(groupOrder2);
FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink");
Plan plan = new Plan(sink, "Reduce Group Order Test");
plan.setDefaultParallelism(DEFAULT_PARALLELISM);
OptimizedPlan oPlan;
try {
oPlan = compileNoStats(plan);
} catch(CompilerException ce) {
ce.printStackTrace();
fail("The pact compiler is unable to compile this plan correctly.");
return; // silence the compiler
}
OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
SinkPlanNode sinkNode = resolver.getNode("Sink");
DualInputPlanNode coGroupNode = resolver.getNode("CoGroup");
// verify the strategies
Assert.assertEquals(ShipStrategyType.FORWARD, sinkNode.getInput().getShipStrategy());
Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput1().getShipStrategy());
Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput2().getShipStrategy());
Channel c1 = coGroupNode.getInput1();
Channel c2 = coGroupNode.getInput2();
Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy());
Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy());
FieldList ship1 = new FieldList(new int[] {3, 0});
FieldList ship2 = new FieldList(new int[] {6, 0});
FieldList local1 = new FieldList(new int[] {3, 0, 5});
FieldList local2 = new FieldList(new int[] {6, 0, 1, 4});
Assert.assertEquals(ship1, c1.getShipStrategyKeys());
Assert.assertEquals(ship2, c2.getShipStrategyKeys());
Assert.assertEquals(local1, c1.getLocalStrategyKeys());
Assert.assertEquals(local2, c2.getLocalStrategyKeys());
Assert.assertTrue(c1.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]);
Assert.assertTrue(c1.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
Assert.assertTrue(c2.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]);
Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
// check that the local group orderings are correct
Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]);
Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]);
Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]);
}