apache pig Java UDF - changing values in attributes doesn't seem to stick

Go To StackoverFlow.com

2

I'm trying to write a Java UDF that will rank tuples in a bag using a java UDF. The tuples have a value column that is the criteria for the ranking and a rank column which is initially set to 0. The tuples are sorted based on the value column. All the tuples are placed in a bag and that bag is placed inside a new tuple which is passed to the UDF.

The UDF is modifying the rank column however - once the method exits the values have all become 0 again. I'm not sure how to get the values to "Stick".

Any help would greatly appreciated.

Here is my java class

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;

/**
 *
 * @author Winter
 */
public class Ranker  extends EvalFunc<String>{
    @Override
    public String exec(Tuple tuple) throws IOException {
        if (tuple == null || tuple.size() == 0) {
            return null;
        }


        List<Object> list = tuple.getAll();
        DataBag db = (DataBag) list.get(0);
        Integer num = (Integer)list.get(1);

        Iterator<Tuple>itr = db.iterator();
        boolean containsNonNull = false;
        int i = 1;
        double previous=0;
        while (itr.hasNext()) {

            Tuple t= itr.next();
            double d = (Double)t.get(num.intValue());
            int rankCol = t.size()-1;
            Integer rankVal = (Integer)t.get(rankCol);
            if(i == 0){    
                System.out.println("i==0");
                previous = d;
                t.set(rankCol, i);
            } else {
                if(d == previous)
                    t.set(rankCol, i);
                else{
                    System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
                    t.set(rankCol, ++i);
                    rankVal = (Integer)t.get(rankCol);
                     System.out.println("|now rank val" + rankVal);
                    previous = d;
                }
            }
        }


        return "Y";
    }
}

Here is how I am calling everything in Pig -

REGISTER /myJar.jar;
A = LOAD '/Users/Winter/milk-tea-coffee.tsv'  as (year:chararray, milk:double);
B = foreach A generate year, milk, 0 as rank;
C = order B by milk asc; 
D = group C by rank order C by milk;
E = foreach D generate D.C.year,D.C.milk,D.C.rank,  piglet3.evalFunctions.Ranker(D.C,1);
dump E;

I can tell its working inside the UDF because of the print statements inside the UDF - d!==previous|21.2|0.0|0|now rank val2 d!==previous|21.6|21.2|0|now rank val3 d!==previous|21.9|21.6|0|now rank val4 d!==previous|22.0|21.9|0|now rank val5 d!==previous|22.5|22.0|0|now rank val6 d!==previous|22.9|22.5|0|now rank val7 d!==previous|23.0|22.9|0|now rank val8 d!==previous|23.4|23.0|0|now rank val9 d!==previous|23.8|23.4|0|now rank val10 d!==previous|23.9|23.8|0|now rank val11

but when I dump out E or D or C the rank column only contains 0s.

2012-04-04 22:52
by Winter


1

The exec function must return the output you want from the UDF. You are currently modifying the Tuple that is being passed to the exec function, then returning the String "Y" -- all that Pig see's as output from your UDF is "Y". In this case, you should return the Tuple instead of "Y".

I think the following code is close to your intent, but I'm not quite clear on what you are trying to do:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;

/**
 *
 * @author Winter
 */
public class Ranker  extends EvalFunc<Tuple>{
    @Override
    public Tuple exec(Tuple tuple) throws IOException {
        if (tuple == null || tuple.size() == 0) {
            return null;
        }


        List<Object> list = tuple.getAll();
        DataBag db = (DataBag) list.get(0);
        Integer num = (Integer)list.get(1);

        Iterator<Tuple>itr = db.iterator();
        boolean containsNonNull = false;
        int i = 1;
        double previous=0;
        while (itr.hasNext()) {

            Tuple t= itr.next();
            double d = (Double)t.get(num.intValue());
            int rankCol = t.size()-1;
            Integer rankVal = (Integer)t.get(rankCol);
            if(i == 0){    
                System.out.println("i==0");
                previous = d;
                t.set(rankCol, i);
            } else {
                if(d == previous)
                    t.set(rankCol, i);
                else{
                    System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
                    t.set(rankCol, ++i);
                    rankVal = (Integer)t.get(rankCol);
                     System.out.println("|now rank val" + rankVal);
                    previous = d;
                }
            }
        }


        return tuple;
    }
}
2012-04-05 06:59
by msponer
I think that part of my problem is that to rank a tuple you have to compare it to the tuple above it so you have to work on whole bags - Winter 2012-04-05 12:14
For example a tuple doesn't know if it is in 2nd place unless it can see the 1st place tuple above it. Thats why i put all the tuples inside a bag and then put that bag inside a new tuple. But there may be an entirely better way to do the whole thing - Winter 2012-04-05 12:17
You were correct! Thanks so much - Winter 2012-04-05 13:25
Ads