How to extract Google PubSub publish time in Apache Beam
Asked Answered
O

1

8

My goal is to be able to access PubSub message Publish Time as recorded and set by Google PubSub in Apache Beam (Dataflow).

    PCollection<PubsubMessage> pubsubMsg
            = pipeline.apply("Read Messages From PubSub",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(pocOptions.getInputSubscription()));

Does not seem to contain one as an attribute. I have tried

 .withTimestampAttribute("publish_time")

No luck either. What am I missing? Is it possible to extract Google PubSub publish time in dataflow?

Oh answered 27/3, 2019 at 4:58 Comment(0)
A
19

Java version:

PubsubIO will read the message from Pub/Sub and assign the message publish time to the element as the record timestamp. Therefore, you can access it using ProcessContext.timestamp(). As an example:

p
    .apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
    .apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            LOG.info("Message: " + c.element());
            LOG.info("Publish time: " + c.timestamp().toString());
            Date date= new Date();
            Long time = date.getTime();
            LOG.info("Processing time: " + new Instant(time).toString());
        }
    }));

I published a message a little bit ahead (to have a significant difference between event and processing time) and output with DirectRunner was:

Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z

Minimal code here


Python version:

Now the timestamp can be accessed through DoFn.TimestampParam of the process method (docs):

class GetTimestampFn(beam.DoFn):
  """Prints element timestamp"""
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
    yield element

Note: date parsing thanks to this answer.

Output:

INFO:root:>>> Element timestamp: 2019-08-12 20:16:53

Full code

Aerie answered 27/3, 2019 at 10:11 Comment(5)
Would also like to thank you specifically for the code sample. Made it super easy to implement. Kudos!Oh
Nice, I'm happy I was able to helpAerie
I'm programming in python and I can't find the equivalent of Process context.Frazier
Sure, @Rim. I have edited my original answer to add the Python equivalent too!Aerie
Excellent answer with great detail and shared code! Thanks!! :)Scotia

© 2022 - 2024 — McMap. All rights reserved.