I'm trying to write a Java UDF that will rank tuples in a bag using a java UDF. The tuples have a value column that is the criteria for the ranking and a rank column which is initially set to 0. The tuples are sorted based on the value column. All the tuples are placed in a bag and that bag is placed inside a new tuple which is passed to the UDF.
The UDF is modifying the rank column however - once the method exits the values have all become 0 again. I'm not sure how to get the values to "Stick".
Any help would greatly appreciated.
Here is my java class
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;
/**
*
* @author Winter
*/
public class Ranker extends EvalFunc<String>{
@Override
public String exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return null;
}
List<Object> list = tuple.getAll();
DataBag db = (DataBag) list.get(0);
Integer num = (Integer)list.get(1);
Iterator<Tuple>itr = db.iterator();
boolean containsNonNull = false;
int i = 1;
double previous=0;
while (itr.hasNext()) {
Tuple t= itr.next();
double d = (Double)t.get(num.intValue());
int rankCol = t.size()-1;
Integer rankVal = (Integer)t.get(rankCol);
if(i == 0){
System.out.println("i==0");
previous = d;
t.set(rankCol, i);
} else {
if(d == previous)
t.set(rankCol, i);
else{
System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
t.set(rankCol, ++i);
rankVal = (Integer)t.get(rankCol);
System.out.println("|now rank val" + rankVal);
previous = d;
}
}
}
return "Y";
}
}
Here is how I am calling everything in Pig -
REGISTER /myJar.jar;
A = LOAD '/Users/Winter/milk-tea-coffee.tsv' as (year:chararray, milk:double);
B = foreach A generate year, milk, 0 as rank;
C = order B by milk asc;
D = group C by rank order C by milk;
E = foreach D generate D.C.year,D.C.milk,D.C.rank, piglet3.evalFunctions.Ranker(D.C,1);
dump E;
I can tell its working inside the UDF because of the print statements inside the UDF - d!==previous|21.2|0.0|0|now rank val2 d!==previous|21.6|21.2|0|now rank val3 d!==previous|21.9|21.6|0|now rank val4 d!==previous|22.0|21.9|0|now rank val5 d!==previous|22.5|22.0|0|now rank val6 d!==previous|22.9|22.5|0|now rank val7 d!==previous|23.0|22.9|0|now rank val8 d!==previous|23.4|23.0|0|now rank val9 d!==previous|23.8|23.4|0|now rank val10 d!==previous|23.9|23.8|0|now rank val11
but when I dump out E or D or C the rank column only contains 0s.
The exec function must return the output you want from the UDF. You are currently modifying the Tuple that is being passed to the exec function, then returning the String "Y" -- all that Pig see's as output from your UDF is "Y". In this case, you should return the Tuple instead of "Y".
I think the following code is close to your intent, but I'm not quite clear on what you are trying to do:
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;
/**
*
* @author Winter
*/
public class Ranker extends EvalFunc<Tuple>{
@Override
public Tuple exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return null;
}
List<Object> list = tuple.getAll();
DataBag db = (DataBag) list.get(0);
Integer num = (Integer)list.get(1);
Iterator<Tuple>itr = db.iterator();
boolean containsNonNull = false;
int i = 1;
double previous=0;
while (itr.hasNext()) {
Tuple t= itr.next();
double d = (Double)t.get(num.intValue());
int rankCol = t.size()-1;
Integer rankVal = (Integer)t.get(rankCol);
if(i == 0){
System.out.println("i==0");
previous = d;
t.set(rankCol, i);
} else {
if(d == previous)
t.set(rankCol, i);
else{
System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
t.set(rankCol, ++i);
rankVal = (Integer)t.get(rankCol);
System.out.println("|now rank val" + rankVal);
previous = d;
}
}
}
return tuple;
}
}