Splitting a tuple into multiple tuples in Pig
Asked Answered
F

3

5

I like to generate multiple tuples from a single tuple. What I mean is: I have file with following data in it.

>> cat data
ID | ColumnName1:Value1 | ColumnName2:Value2

so I load it by the following command

grunt >> A = load '$data' using PigStorage('|');    
grunt >> dump A;    
(ID,ColumnName1:Value1,ColumnName2:Value2) 

Now I want to split this tuple into two tuples.

(ID, ColumnName1, Value1)
(ID, ColumnName2, Value2)

Can I use UDF along with foreach and generate. Some thing like the following?

grunt >> foreach A generate SOMEUDF(A)

EDIT:

input tuple : (id1,column1,column2) output : two tuples (id1,column1) and (id2,column2) so it is List or should I return a Bag?

public class SPLITTUPPLE extends EvalFunc <List<Tuple>>
{
    public List<Tuple> exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
        try{
            // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory.
            // return list of tuples.
        }catch(Exception e){
            throw WrappedIOException.wrap("Caught exception processing input row ", e);
        }
    }
}

Is this approach correct?

Field answered 2/7, 2012 at 3:1 Comment(0)
S
10

You could write a UDF or use a PIG script with built-in functions.

For example:

-- data should be chararray, PigStorage('|') return bytearray which will not work for this example
inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray);

-- split by | and create a row so we can dereference it later
splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ;

-- first column is id, rest is converted into a bag and flatten it to make rows
id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value;
-- there will be records with (id, id), but id should not have ':'
id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals;
final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val);
dump final;

Test INPUT:

1|c1:11:33|c2:12
234|c1:21|c2:22
33|c1:31|c2:32
345|c1:41|c2:42

OUTPUT

(1,c1,11:33)
(1,c2,12)
(234,c1,21)
(234,c2,22)
(33,c1,31)
(33,c2,32)
(345,c1,41)
(345,c2,42)

I hope it helps.

Cheers.

Saied answered 2/7, 2012 at 18:4 Comment(5)
Thanks a lot. Can I do the same thing with writing an UDF. I update the question.Field
Yes you can. See next answer.Saied
Its great help. Thanks for your time.Field
Hey, I tried running your use case and then got stuck with the following error: grunt> id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals; 2016-03-18 16:41:24,677 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1045: <line 8, column 65> Could not infer the matching function for org.apache.pig.builtin.STRSPLIT as multiple or none of them fit. Please use an explicit cast. Kindly update the relation accordingly to avoid the exception.Specious
Over the years the interfaces might have changed, had that in the past. Which version of pig are you using?Saied
S
6

Here is the UDF version. I prefer to return a BAG:

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

/**
 * Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
 * {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...}
 *  
 *  Default rows separator is '|' and key value separator is ':'. 
 *  In this implementation white spaces around separator characters are not removed.
 *  ID can be made of any character (including sequence of white spaces). 
 * @author 
 *
 */
public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> {

    private static final TupleFactory tupleFactory = TupleFactory.getInstance();
    private static final BagFactory bagFactory = BagFactory.getInstance();

    //Row separator character. Default is '|'.
    private String rowsSeparator;
    //Column value separator character. Default i
    private String columnValueSeparator;

    public TupleToBagColumnValuePairs() {
        this.rowsSeparator = "\\|";
        this.columnValueSeparator = ":";
    }

    public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) {
        this.rowsSeparator = rowsSeparator;
        this.columnValueSeparator = keyValueSeparator;
    }

    /**
     * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray)
     * @param outputBag Output tuples (id, column, value) are added to this bag
     * @param id
     * @param column
     * @param value
     * @throws ExecException
     */
    protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException {
        Tuple outputTuple = tupleFactory.newTuple();
        outputTuple.append(id);
        outputTuple.append(column);
        outputTuple.append( value);
        outputBag.add(outputTuple);
    }

    /**
     * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value)
     * @param outputBag Output tuples (id, column, value) should be added to this bag
     * @param id 
     * @param splitInputLine format column{separator}value, which start from index 1
     * @throws ExecException
     */
    protected void parseColumnValues(DataBag outputBag, String id,
            String[] splitInputLine) throws ExecException {
        for (int i = 1; i < splitInputLine.length; i++) {
            if (splitInputLine[i] != null) {
                int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator);
                if (columnValueSplitIndex != -1) {
                    String column = splitInputLine[i].substring(0, columnValueSplitIndex);
                    String value = null;
                    if (columnValueSplitIndex + 1 < splitInputLine[i].length()) {
                        value = splitInputLine[i].substring(columnValueSplitIndex + 1);
                    }
                    this.addTuple(outputBag, id, column, value);
                } else {
                    String column = splitInputLine[i];
                    this.addTuple(outputBag, id, column, null);
                }
            }
        }
    }

    /**
     * input - contains only one field of type chararray, which will be split by '|'
     * All inputs that are: null or of length 0 are ignored.
     */
    @Override
    public DataBag exec(Tuple input) throws IOException {
        if (input == null || input.size() != 1 || input.isNull(0)) {
            return null;
        }

        String inputLine = (String)input.get(0);
        String[] splitInputLine = inputLine.split(this.rowsSeparator, -1);

        if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) {
            String id = splitInputLine[0];
            DataBag outputBag = bagFactory.newDefaultBag();            
            if (splitInputLine.length == 1) { // there is just an id in the line
                this.addTuple(outputBag, id, null, null);
            } else {
                this.parseColumnValues(outputBag, id, splitInputLine);
            }


           return outputBag; 
        }
        return null;
    }

    @Override
    public Schema outputSchema(Schema input) {
        try {
            if (input.size() != 1) {
                throw new RuntimeException("Expected input to have only one field");
            }

            Schema.FieldSchema inputFieldSchema = input.getField(0);
            if (inputFieldSchema.type != DataType.CHARARRAY) {
                throw new RuntimeException("Expected a CHARARRAY as input");
            }

            Schema tupleSchema = new Schema();
            tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY));
            tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY));
            tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY));

            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG));
        } catch (FrontendException exx) {
            throw new RuntimeException(exx);
        }
    }

}

Here is how it is used in PIG:

register 'path to the jar';
define IdColumnValue myPackage.TupleToBagColumnValuePairs();

inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray);
result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2);
dump result;

A good inspiration for writing UDFs with bags see DataFu source code by LinkedIn

Saied answered 3/7, 2012 at 11:19 Comment(0)
B
0

You could use TransposeTupleToBag (UDF from DataFu lib) on the output of STRSPLIT to get the bag, and then FLATTEN the bag to create separate row per original column.

Breathless answered 8/8, 2014 at 21:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.