String customersFile = tmpDir.copyResourceFileName("customers.txt");
String ordersFile = tmpDir.copyResourceFileName("orders.txt");
String addressesFile = tmpDir.copyResourceFileName("addresses.txt");
PTable<String, String> customersTable = pipeline.readTextFile(customersFile)
.parallelDo("Split customers", new StringToPairMapFn(), tableOf(strings(), strings()));
PTable<String, String> ordersTable = pipeline.readTextFile(ordersFile)
.parallelDo("Split orders", new StringToPairMapFn(), tableOf(strings(), strings()));
PTable<String, String> assignedOrders = new BloomFilterJoinStrategy<String, String, String>(5)
.join(customersTable, ordersTable, JoinType.INNER_JOIN)
.parallelDo(new MapFn<Pair<String, Pair<String, String>>, Pair<String, String>>() {