Build custom join logic in Cascading ensuring MAP_SIDE only
Asked Answered
B

1

10

I have 3 cascading pipes (one to join against other two) described as follows,

  • LHSPipe - (larger size)

enter image description here

  • RHSPipes - (smaller size that could possibly fit to memory)

enter image description here

Psuedocode as follows, This example involves two joins

IF F1DecidingFactor = YES then Join LHSPipe with RHS Lookup#1 BY (LHSPipe.F1Input = RHS Lookup#1.Join#F1) and set the lookup result (SET LHSPipe.F1Output = Result#F1) Otherwise SET LHSPipe.F1Output = N/A

The same logic applies for F2 computation.

The expected output,

enter image description here

This scenario forced me to go with Custom Join operation as IF-ELSE decides whether to Join or not.

Considering the above scenario, I would like to go for MAP-SIDE join (keeping RHSPipe in memory of MAP task node), I was thinking of the below possible solutions, each has its pros and cons. Need your suggestions on these.

Option#1:

CoGroup - We can build custom join logic using CoGroup with BufferJoiner followed by custom join (operation), but that wouldnt ensure MAP-SIDE join.

Option#2:

HashJoin - It ensures MAP-SIDE join, but as far as I see custom join cannot be built using this.

Please correct my understanding and suggest your opinions to work on this requirement.

Thanks in advance.

Bascinet answered 22/3, 2016 at 6:31 Comment(4)
Can you provide your sample code and also what do you want to do in custom join?Slideaction
Sample input data and expected output will also be helpful.Slideaction
Have you considered partitioning your data in subsets?Syncopate
Did you try the solutions provided in the below answer?Slideaction
S
1

The best way to solve this problem (which I can think off) is to modify your smaller dataset. You can add a new field (F1DecidingFactor) to the smaller dataset. The value of F1Result can should like:

Sudo code

if F1DecidingFactor == "Yes" then
    F1Result = ACTUAL_VALUE
else
    F1Result = "N/A"

Result Table

|F1#Join|F1#Result|F1#DecidingFactor|
|    Yes|        0|             True|
|    Yes|        1|            False|
|     No|        0|              N/A|
|     No|        1|              N/A|

You can do above via cascading as well.

After this, you can do your map side join.

If modifying smaller dataset is not possible, then I have 2 options to get the problem solved.

Option 1

Add new fields to your small pipes which is equivalent to you deciding factor (i.e. F1DecidingFactor_RHS = Yes). Then include it to your join criteria. Once your join is done, You will have values to only those rows where this condition is matching. Otherwise it will be null/blank. Sample code:

Main Class

import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTestOption2 {
    public StackHashJoinTestOption2() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields f1DecidingFactor = new Fields("F1DecidingFactor");
        Fields f2DecidingFactor = new Fields("F2DecidingFactor");
        Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
        Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");

        Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
        Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);

        Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
        Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);

        Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // New field to small pipe. Expected Fields:
        // F1Join F1Result F1DecidingFactor_RHS
        rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // New field to small pipe. Expected Fields:
        // F2Join F2Result F2DecidingFactor_RHS
        rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);

        // Joining first small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
        Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin());

        // Joining second small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
        Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin());

        Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);

        result = new Discard(result, f1DecidingFactorRhs);
        result = new Discard(result, f2DecidingFactorRhs);

        // result Pipe should have expected result
    }
}

Option 2

If you want to have default value instead of null/blank, then I would suggest you do the HashJoin first with default Joiners followed by a function to update tuples with appropriate values. Something like:

Main Class

import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTest {
    public StackHashJoinTest() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // Joining first small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
        Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin());

        // Joining second small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
        Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin());

        Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);

        // result Pipe should have expected result
    }
}

Update Function

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

public class TestFunction extends BaseOperation<Void> implements Function<Void> {

    private static final long serialVersionUID = 1L;

    private static final String DECIDING_FACTOR = "No";
    private static final String DEFAULT_VALUE = "N/A";

    // Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"
    public TestFunction() {
        super(Fields.ARGS);
    }

    @Override
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) {
        TupleEntry arguments = call.getArguments();

        TupleEntry result = new TupleEntry(arguments);

        if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F1Output", DEFAULT_VALUE);
        }

        if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F2Output", DEFAULT_VALUE);
        }

        call.getOutputCollector().add(result);
    }

}

References

This should solve your problem. Let me know if this helps.

Slideaction answered 10/4, 2016 at 14:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.