Go + Apache Beam GCP Dataflow: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1
Asked Answered
C

1

8

I am using the Go SDK with Apache Beam to build a simple Dataflow pipeline that will get data from a query and publish the data to pub/sub with the following code:

package main

import (
    "context"
    "flag"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "gitlab.com/bq-to-pubsub/infra/env"
    "gitlab.com/bq-to-pubsub/sources"
    "gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
    flag.Parse()
    ctx := context.Background()
    beam.Init()
    log.Info(ctx, "Creating new pipeline")
    pipeline, scope := beam.NewPipelineWithRoot()
    project := gcpopts.GetProject(ctx)

    ppData := pp.Query(scope, project)
    ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
    pubsubio.Write(scope, "project", "topic", ppMessages)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

While my pipeline is running on Google Cloud Dataflow, I get the following error:

Workflow failed. Causes: S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1.

I have read this thread but I am not sure how it was resolved.

Any idea?

Cornstarch answered 20/10, 2021 at 19:0 Comment(2)
How did you solve this eventually? I'm facing the similar issue.Hyperspace
Unfortunately we had to write the workflow in Kotlin. So no proper solution.Cornstarch
T
3

Is the job running in Streaming mode or Batch mode? I'd guess Batch mode. It might be the Dataflow internal runner used for batch mode doesn't link in the pub sub sink.

Unfortunately at this time, the Go SDK doesn't provide a local "fallback" for writing to pubsub that the batch runner can use instead.

That said, you should be unblocked pretty easily if you write your own DoFn to write to PubSub using the standard Go package. https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing

Roughly what you should write would look like the following.

var (
  // Assuming everything is one project
  clientOnce sync.Once
  pubSubClient pubsub.Client
)

type PubSubSinkFn struct{
  Project, Topic string // Whatever configuration you need

  client pubsub.Client  // Client is safe to use on multiple goroutines
  batch []*myMessages   // per bundle batches.
}

func (fn *PubSubSinkFn) Setup(ctx context.Context) {
   clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
   fn.client = pubSubClient
}

func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
  fn.batch = append(fn.batch, v)
  if len(fn.batch) > batchSize { // or whatever criteria you want
     fn.publishBatch()
  }
}

func (fn *PubSubSinkFn) FinishBundle() {
  fn.publishBatch()
}

func (fn *PubSubSinkFn) publishBatch() {
  // use fn.client to publish the batch
  fn.batch = nil
}

// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)
Tee answered 21/10, 2021 at 16:59 Comment(2)
I'm getting a similar error now. Can you please explain what should I replace at clientOnce.Do (... ) ?Hyperspace
Create a new client using NewClient there and assign it to fn.ClientVestpocket

© 2022 - 2024 — McMap. All rights reserved.