Dataflow Error: 'Clients have non-trivial state that is local and unpickleable'
Asked Answered
C

2

8

I have a pipeline that I can execute locally without any errors. I used to get this error in my locally run pipeline

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

I believe I fixed this by downgrading to apache-beam=2.3.0 Then locally it would run perfectly.

Now I am using DataflowRunner and in the requirements.txt file I have the following dependencies

    apache-beam==2.3.0
    google-cloud-bigquery==1.1.0
    google-cloud-core==0.28.1
    google-cloud-datastore==1.6.0
    google-cloud-storage==1.10.0
    protobuf==3.5.2.post1
    pytz==2013.7

but I get this dreaded error again

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

How come it's giving me the error with DataflowRunner but not DirectRunner? shouldn't they be using the same dependencies/environment? Any help would be appreciated.

I had read that this is the way to solve it but when I try it I still get the same error

    class MyDoFn(beam.DoFn):

        def start_bundle(self, process_context):
            self._dsclient = datastore.Client()

        def process(self, context, *args, **kwargs):
        # do stuff with self._dsclient

from https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

My previous reference post where I fixed this locally:

Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()

Thanks in advance!

Cyanide answered 30/5, 2018 at 19:9 Comment(3)
Do you get a stack trace?Dissatisfied
The DirectRunner is designed to validate your pipeline before deployment and ensure its robust across the various Beam runners. Consequently, it should work on the DataflowRunner if it’s running on the same Beam version as your DirectRunner. Could you share a stack trace or logs of the job?Coextend
I am facing a similar issue on Dataflow while trying to write data to a BigQuery table from the pipeline, though its running from DirectRunner. Has anyone faced a similar issue while writing to BigQuery from Dataflow.Beecham
C
10

Initializing unpickleable clients in start_bundle method is a correct approach, and Beam IOs often follow that, see datastoreio.py as an example. Here is a pipeline that does a simple operation with a GCS python client in a DoFn. I ran it on Apache Beam 2.16.0 without issues. If you can still reproduce your issue, please provide additional details.

gcs_client.py file:

import argparse
import logging
import time

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

class MyDoFn(beam.DoFn):
  def start_bundle(self):
    self.storage_client = storage.Client()

  def process(self, element):
    bucket = self.storage_client.get_bucket("existing-gcs-bucket")
    blob = bucket.blob(str(int(time.time())))
    blob.upload_from_string("payload")
    return element

logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()

pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())

p.run().wait_until_finish()

requirements.txt file:

google-cloud-storage==1.23.0

command line:

python -m gcs_client \
    --project=insert_your_project \
    --runner=DataflowRunner \
    --temp_location gs://existing-gcs-bucket/temp/ \
    --requirements_file=requirements.txt \
    --save_main_session
Cognac answered 26/11, 2019 at 3:16 Comment(0)
F
0

I've had a similar issue when making Dataflow write a bunch of rows to Bigtable. Setting --save-main-session to False seems to have solved it.

Foreshorten answered 14/4, 2020 at 16:51 Comment(1)
It didn't work for me setting --save-main-session to FalseJainism

© 2022 - 2024 — McMap. All rights reserved.