Apache Flink: ProcessWindowFunction KeyBy() multiple values
Asked Answered
U

1

0

I'm trying to use WindowFunction with DataStream, my goal is to have a Query like the following

SELECT  *,
    count(id) OVER(PARTITION BY country) AS c_country,
    count(id) OVER(PARTITION BY city) AS c_city,
    count(id) OVER(PARTITION BY city) AS c_addrs
FROM fm
ORDER BY country

have helped me for the aggregation by the country field, but I need to do the aggregation by two fields in the same time window. I don't know if it is possible to have two or more keys in keyBy( ) for this case

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })


    parsed
    .keyBy(x => x._2) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      }).print().setParallelism(1)

This is great for the first aggregation, but I'm missing the second aggregation for the city field in the same time window.

input data:

10,"SPAIN","BARCELONA","C1"
20,"SPAIN","BARCELONA","C2"
30,"SPAIN","MADRID","C3"
30,"SPAIN","MADRID","C3"
80,"SPAIN","MADRID","C4"
90,"SPAIN","VALENCIA","C5"
40,"ITALY","ROMA","C6"
41,"ITALY","ROMA","C7"
42,"ITALY","VENECIA","C8"
50,"FRANCE","PARIS","C9"
60,"FRANCE","PARIS","C9"
70,"FRANCE","MARSELLA","C10"

expected output

(10,"SPAIN","BARCELONA",6,2,1)
(20,"SPAIN","BARCELONA",6,2,1)
(30,"SPAIN","MADRID",6,3,2)
(30,"SPAIN","MADRID",6,3,2)
(80,"SPAIN","MADRID",6,3,1)
(90,"SPAIN","VALENCIA",6,1,1)
(50,"FRANCE","PARIS",3,2,1)
(60,"FRANCE","PARIS",3,2,1)
(70,"FRANCE","MARSELLA",3,1,1)
(40,"ITALY","ROMA",3,2,2)
(41,"ITALY","ROMA",3,2,2)
(42,"ITALY","VENECIA",3,1,1)

---------------- UPDATE 2 ------------------

I currently want to do the aggregation for 3 columns. If the option I'm using is to chain the KeyBy() output, but this can become very long and complex, and not very readable. In addition to this, I put a time window of Time.seconds(1) because without this window the KeyBy() output above takes as individual events.

my interest is if I can make these aggregations in a single process function.

I have the code that long...

parsed
    .keyBy(_.country) // key by product id.
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        AlarmasIn, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[AlarmasIn],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address, lst.size,0,0)))
      }
      })
      .keyBy( _.city).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .process(new ProcessWindowFunction[
          AlarmasOut, AlarmasOut, String, TimeWindow
        ]() {
          override def process(key: String,
                               context: Context,
                               elements: Iterable[AlarmasOut],
                               out: Collector[AlarmasOut]): Unit = {
            val lst = elements.toList
            lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,lst.size,x.c_addr)))
          }
        })
      .keyBy( _.address).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
      .process(new ProcessWindowFunction[
        AlarmasOut, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String,
                             context: Context,
                             elements: Iterable[AlarmasOut],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,x.c_city,lst.size)))
        }
      })
      .print()

/// CASE CLASS
 case class AlarmasIn(
                      id: Int,
                      country: String,
                      city: String,
                      address: String
                    )

  case class AlarmasOut(
                       id: Int,
                       country: String,
                       city: String,
                       address: String,
                       c_country: Int,
                       c_city: Int,
                       c_addr: Int
                     )

Unbidden answered 20/5, 2020 at 20:11 Comment(0)
K
1

As city is a subcategory of country, you can aggregate the stream by city dimension first, then do another aggregation by country dimension.

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })


    parsed
    .keyBy(x => x._3) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      })
      .keyBy(x => x._2)
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val cnt = 0
          for(e:elements){
             cnt += e._4
          }

          lst.foreach(x => out.collect((x._1, x._2, x._3, cnt)))
      }
      }).print().setParallelism(1)

If one dimension is not a sub-dimension of the other dim, you can concat these 2 dims and generete a new key, then implement the aggregation logic in process func by yourself.

keyBy(x=>x._2+x._3)

UPDATE

I think it is not possible to calculate the result in one process function cuz you are trying to do the statistics with different keys. The only way to do it in one step is that you set the global parallelism to 1(all input data will go to one downstream task even you use a keyby func) or broadcast the input data to all downstream tasks.

Since your calculation actually have some common process logic, it would be better to do some abstraction.

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object CountJob {

  @throws[Exception]
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val transactions: DataStream[Record] = env
      .addSource(new SourceFunction[Record] {
        override def run(sourceContext: SourceFunction.SourceContext[Record]): Unit = {
          while (true) {
            sourceContext.collect(Record(1, "a", "b", "c", 1, 1, 1))
            Thread.sleep(1000)
          }
        }

        override def cancel(): Unit = {

        }
      })
      .name("generate source")

    transactions.keyBy(_.addr)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("ADDR"))
      .keyBy(_.city)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("CITY"))
      .keyBy(_.country)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("COUNTRY"))
      .print()


    env.execute("Count Job")
  }
}

// a common operator to process different aggregation
class CustomCountProc(aggrType: String) extends ProcessWindowFunction[Record, Record, String, TimeWindow] {

  override def process(key: String, context: Context, elements: Iterable[Record], out: Collector[Record]): Unit = {

    for (e <- elements) {
      if ("ADDR".equals(aggrType)) {
        out.collect(Record(-1, e.country, e.city, key, e.country_cnt, e.city_cnt, elements.size))
      }
      else if ("CITY".equals(aggrType)) {
        out.collect(Record(-1, e.country, key, e.country, e.country_cnt, elements.size, e.addr_cnt))
      }
      else if ("COUNTRY".equals(aggrType)) {
        out.collect(Record(-1, key, e.city, e.addr, elements.size, e.city_cnt, e.addr_cnt))
      }
    }

  }
}

case class Record(
                   id: Int,
                   country: String,
                   city: String,
                   addr: String,
                   country_cnt: Int,
                   city_cnt: Int,
                   addr_cnt: Int
                 ) {
}

Btw, I am not sure if the output actually meets your expectation. As you didn't implement a stateful process function, I think you are trying to calculate the aggregation results for each batch of data, and each batch contains data ingested in a time window of one second. The output won't accumulate all the time, each batch will start from zero.

By using timeWindow function, you also need to notice the TimeCharacteristic which by default is the processing time.

The output may also delay because of using 3 consequent window function. Suppose the first process func has completed the aggregation within one second and forward the results downstream. As the second process func also has a timewindow of 1 second, it won't emit any result until it receives next batch of output from upstream.

Let's see if others have better solution to your problem.

Kloster answered 22/5, 2020 at 7:44 Comment(1)
I've updated the question information. Could you help me simplify my code? Please.Unbidden

© 2022 - 2024 — McMap. All rights reserved.