Google DataFlow Apache Beam
Asked Answered
S

1

5

I am trying to use Apache Beam to create a Dataflow pipeline and I am not able to follow the documentation and cannot find any examples.

The pipeline is simple.

  1. Create a pipeline
  2. Read from a pub/sub topic
  3. Write to spanner.

Currently, I am stuck at step 2. I cannot find any example of how to read from pub/sub and and consume it.

This is the code I have so far and would like to

class ExtractFlowInfoFn extends DoFn<PubsubMessage, KV<String, String>> {
    public void processElement(ProcessContext c) {
        KV.of("key", "value");
    }
}

public static void main(String[] args) {

    Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());

    p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
     .apply("ConvertToKeyValuePair", ParDo.of(new ExtractFlowInfoFn()))
     .apply("WriteToLog", ));
};

I was able to come up with the code by following multiple examples. To be honest, I have no idea what I am doing here.

Please, either help me understand this or link me to the correct documentation.

Squat answered 10/2, 2018 at 1:13 Comment(0)
D
7

Example of pulling messages from Pub/Sub and writing to Cloud Spanner:

import com.google.cloud.spanner.Mutation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;

class MessageToMutationDoFn extends DoFn<PubsubMessage, Mutation> {

    @ProcessElement
    public void processElement(ProcessContext c) {

        // TODO: create Mutation object from PubsubMessage

        Mutation mutation = Mutation.newInsertBuilder("users_backup2")
            .set("column_1").to("value_1")
            .set("column_2").to("value_2")
            .set("column_3").to("value_3")
            .build();

        c.output(mutation);
    }
}

public static void main(String[] args) {

    Pipeline p = Pipeline.create();

    p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
     .apply("MessageToMutation", ParDo.of(new MessageToMutationDoFn()))
     .apply("WriteToSpanner", SpannerIO.write()
         .withProjectId("projectId")
         .withInstanceId("spannerInstanceId")
         .withDatabaseId("spannerDatabaseId"));

    p.run();
}

Reference: Apache Beam SpannerIO, Apache Beam PubsubIO

Debidebilitate answered 10/2, 2018 at 4:8 Comment(2)
This example partly works. I am not able to write to spanner. It gives an error and the error is not very useful. But, I will update this post when I have the correct answer.Squat
The fix is in the answer. #46684571. And use Beam version 2.2.0Squat

© 2022 - 2024 — McMap. All rights reserved.