Scalding, flatten fields after groupBy
Asked Answered
E

1

2

I see this: Scalding: How to retain the other field, after a groupBy('field){.size}?

it's a real pain and a mess comparing to Apache Pig... What do I do wrong? Can I do the same like GENERATE(FLATTEN()) pig?

I'm confused. Here is my scalding code:

  def takeTop(topAmount: Int) :Pipe = self
    .groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)}
    .flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount))

And my test:

  "Take top 3" should "return most active pairs" in {
    Given{
      List( (1, 13, 7),
            (1, 13, 8),
            (1, 12, 9),
            (1, 11, 10),
            (2, 20, 21),
            (2, 20, 22)) withSchema (person1, person2, activityCount)
    } When {
      pipe:RichPipe => pipe.takeTop(3)
    } Then {
      buffer: mutable.Buffer[(Long, Long, Long)] =>
      println(buffer.toList)
      buffer.toList.size should equal(5)
      println (buffer.toList)

      buffer.toList should contain (1, 11, 10)
      buffer.toList should contain (1, 12, 9)
      buffer.toList should contain (1, 13, 8)
      buffer.toList should not contain (1, 13, 7)

      buffer.toList should contain (2, 20, 21)
      buffer.toList should contain (2, 20, 22)
  }
  }

And I do get an exception in runtime:

14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
    at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)
    at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)
    at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
    at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321)
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151)
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3
    at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    ... 23 more

What do I do wrong?

UPD:

I did it this way:

 def takeTop(topAmount: Int) :Pipe = self
    .groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
    .flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
    .project(person1, person2, activityCount)

Test passes, but I'm not sure that it's good approach...

Edge answered 23/9, 2014 at 12:14 Comment(0)
E
0
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
.project(person1, person2, activityCount)

Works, didn't find better approach

Edge answered 27/7, 2015 at 7:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.