I am using the Go SDK with Apache Beam to build a simple Dataflow pipeline that will get data from a query and publish the data to pub/sub with the following code:
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"gitlab.com/bq-to-pubsub/infra/env"
"gitlab.com/bq-to-pubsub/sources"
"gitlab.com/bq-to-pubsub/sources/pp"
)
func main() {
flag.Parse()
ctx := context.Background()
beam.Init()
log.Info(ctx, "Creating new pipeline")
pipeline, scope := beam.NewPipelineWithRoot()
project := gcpopts.GetProject(ctx)
ppData := pp.Query(scope, project)
ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
pubsubio.Write(scope, "project", "topic", ppMessages)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
While my pipeline is running on Google Cloud Dataflow, I get the following error:
Workflow failed. Causes: S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1.
I have read this thread but I am not sure how it was resolved.
Any idea?