new DoFn<CommonLogEntry, Pair<String, CommonLogEntry>>() {
@Override
public void process(CommonLogEntry input, Emitter<Pair<String, CommonLogEntry>> emitter) {
emitter.emit(Pair.of(input.getRemoteAddress(), input));
}
}, tf.tableOf(tf.strings(), tf.records(CommonLogEntry.class)));
}
public static PTable<String, String> ipsAndUsers(PCollection<String> ipUsers) {
PTypeFamily tf = ipUsers.getTypeFamily();
return ipUsers.parallelDo(