I see this: Scalding: How to retain the other field, after a groupBy('field){.size}?
it's a real pain and a mess comparing to Apache Pig... What do I do wrong? Can I do the same like GENERATE(FLATTEN()) pig?
I'm confused. Here is my scalding code:
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount))
And my test:
"Take top 3" should "return most active pairs" in {
Given{
List( (1, 13, 7),
(1, 13, 8),
(1, 12, 9),
(1, 11, 10),
(2, 20, 21),
(2, 20, 22)) withSchema (person1, person2, activityCount)
} When {
pipe:RichPipe => pipe.takeTop(3)
} Then {
buffer: mutable.Buffer[(Long, Long, Long)] =>
println(buffer.toList)
buffer.toList.size should equal(5)
println (buffer.toList)
buffer.toList should contain (1, 11, 10)
buffer.toList should contain (1, 12, 9)
buffer.toList should contain (1, 13, 8)
buffer.toList should not contain (1, 13, 7)
buffer.toList should contain (2, 20, 21)
buffer.toList should contain (2, 20, 22)
}
}
And I do get an exception in runtime:
14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)
at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321)
at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151)
at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3
at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669)
at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 23 more
What do I do wrong?
UPD:
I did it this way:
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
.project(person1, person2, activityCount)
Test passes, but I'm not sure that it's good approach...