Apache Pig: FLATTEN and parallel execution of reducers
Asked Answered
U

4

35

I have implemented an Apache Pig script. When I execute the script it results in many mappers for a specific step, but has only one reducer for that step. Because of this condition (many mappers, one reducer) the Hadoop cluster is almost idle while the single reducer executes. In order to better use the resources of the cluster I would like to also have many reducers running in parallel.

Even if I set the parallelism in the Pig script using the SET DEFAULT_PARALLEL command I still result in having only 1 reducer.

The code part issuing the problem is the following:

SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);

The 'inputData' and 'inputDataGrouped' aliases are computed in the mapper.

The 'pairs' and 'pairsFlat' in the reducer.

If I change the script by removing the line with the FLATTEN command (pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);) then the execution results in 5 reducers (and thus in a parallel execution).

It seems that the FLATTEN command is the problem and avoids that many reducers are created.

How could I reach the same result of FLATTEN but having the script being executed in parallel (with many reducers)?

Edit:

EXPLAIN plan when having two FOREACH (as above):

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
|   |
|   Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][0] - scope-23
    |   |
    |   Cast[int] - scope-27
    |   |
    |   |---Project[bytearray][1] - scope-26
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------


Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
    |   |
    |   Project[bag][0] - scope-39
    |
    |---pairs: New For Each(false)[bag] - scope-38
        |   |
        |   POUserFunc(GeneratePairsUDF)[bag] - scope-36
        |   |
        |   |---Project[bag][1] - scope-35
        |       |
        |       |---Project[bag][1] - scope-34
        |
        |---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false

EXPLAIN plan when having only one FOREACH with FLATTEN wrapping the UDF:

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
|   |
|   Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
    |   |
    |   Cast[chararray] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[int] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------


Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
    |   |
    |   POUserFunc(GeneratePairsUDF)[bag] - scope-33
    |   |
    |   |---Project[bag][1] - scope-32
    |       |
    |       |---Project[bag][1] - scope-31
    |
    |---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false
Uniflorous answered 7/11, 2013 at 12:0 Comment(6)
How long does the reducer take to finish when you have just one?Darondarooge
can the 2 FOREACH aliases be combined with the FLATTEN wrapping the UDF? Else look at combiner and use EXPLAIN to see how combiner used.Soldierly
@WinnieNicklaus: Thanks for the comment. The reducer would take several days to compute. The issue is that the server is almost idle all that time because this single reducer is requiring only view of its resources ...Uniflorous
@libjack: Thank you for your feedback. I tried also using only one FOREACH but unfortunately the result is the same, only one reducer. The output of the EXPLAIN is similar for both versions (see edited question).Uniflorous
If you look at your job stats, how many reduce input keys are there? Perhaps all your data resolves to a single reduce input key?Apish
I believe because of how flatten is implemented i.e. outputting a single combined collection you must always have all data pass through a single reducer. I can't suggest an alternative I am afraid but I guess that is why.Kitchenmaid
S
3

There is no surety if pig uses the configuration DEFAULT_PARALLEL value for every steps in the pig script. Try PARALLEL along with your specific join/group step which you feel taking time (In your case GROUP step).

 inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67;

If still it is not working then you might have to see your data for skewness issue.

Stafani answered 17/6, 2014 at 11:12 Comment(1)
For me this was the solution in a similar situation as described in this question. I'm still kind a baffled though that setting the PARALLEL level for the statement explicitly results in different behavior compared to setting default_parallel once in the beginning of the script. The Pig documentation seems to suggest that the default_parallel is used throughout the whole script (unless overwritten at statement level through use of the 'PARALLEL' keyword).Cottingham
A
1

I think there is a skewness in the data. Only a small number of mappers are producing exponentially large output. Look at the distribution of keys in your data. Like data contains few Groups with large number of records.

Analogize answered 17/6, 2014 at 7:28 Comment(0)
T
1

I tried "set default parallel" and "PARALLEL 100" but no luck. Pig still uses 1 reducer.

It turned out I have to generate a random number from 1 to 100 for each record and group these records by that random number.

We are wasting time on grouping, but it is much faster for me because now I can use more reducers.

Here is the code (SUBMITTER is my own UDF):

tmpRecord = FOREACH record GENERATE (int)(RANDOM()*100.0) as rnd, data;
groupTmpRecord = GROUP tmpRecord BY rnd;
result = FOREACH groupTmpRecord GENERATE FLATTEN(SUBMITTER(tmpRecord));
Torrid answered 19/6, 2014 at 23:17 Comment(0)
G
0

To answer your question we must first know how many reducers pig enforces to accomplish the - Global Rearrange process. Because as per my understanding the Generate / Projection should not require a single reducer. I cannot say the same thing about Flatten. However we know from common-sense that during flatten the aim is to de-nestify the tuples from bags and vice versa. And to do that all the tuples belonging to a bag should definitely be available in the same reducer. I might be wrong. But can anyone add something here to get this user an answer please ?

Giverin answered 8/10, 2016 at 1:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.