How to trigger Cloud Dataflow pipeline job from Cloud Function in Java?
W

2

9

I have a requirement to trigger the Cloud Dataflow pipeline from Cloud Functions. But the Cloud function must be written in Java. So the Trigger for Cloud Function is Google Cloud Storage's Finalise/Create Event, i.e., when a file is uploaded in a GCS bucket, the Cloud Function must trigger the Cloud dataflow.

When I create a dataflow pipeline (batch) and I execute the pipeline, it creates a Dataflow pipeline template and creates a Dataflow job.

But when I create a cloud function in Java, and a file is uploaded, the status just says "ok", but it does not trigger the dataflow pipeline.

Cloud function

package com.example;

import com.example.Example.GCSEvent;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.CreateJobFromTemplateRequest;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.logging.Logger;

public class Example implements BackgroundFunction<GCSEvent> {
    private static final Logger logger = Logger.getLogger(Example.class.getName());

    @Override
    public void accept(GCSEvent event, Context context) throws IOException, GeneralSecurityException {
        logger.info("Event: " + context.eventId());
        logger.info("Event Type: " + context.eventType());


        HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(credentials);


        Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, requestInitializer)
                .setApplicationName("Google Dataflow function Demo")
                .build();

        String projectId = "my-project-id";


        RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
        runtimeEnvironment.setBypassTempDirValidation(false);
        runtimeEnvironment.setTempLocation("gs://my-dataflow-job-bucket/tmp");
        CreateJobFromTemplateRequest createJobFromTemplateRequest = new CreateJobFromTemplateRequest();
        createJobFromTemplateRequest.setEnvironment(runtimeEnvironment);
        createJobFromTemplateRequest.setLocation("us-central1");
        createJobFromTemplateRequest.setGcsPath("gs://my-dataflow-job-bucket-staging/templates/cloud-dataflow-template");
        createJobFromTemplateRequest.setJobName("Dataflow-Cloud-Job");
        createJobFromTemplateRequest.setParameters(new HashMap<String,String>());
        createJobFromTemplateRequest.getParameters().put("inputFile","gs://cloud-dataflow-bucket-input/*.txt");
        dataflowService.projects().templates().create(projectId,createJobFromTemplateRequest);

        throw new UnsupportedOperationException("Not supported yet.");
    }

    public static class GCSEvent {
        String bucket;
        String name;
        String metageneration;
    }


}

pom.xml(cloud function)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cloudfunctions</groupId>
  <artifactId>http-function</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.target>11</maven.compiler.target>
    <maven.compiler.source>11</maven.compiler.source>
  </properties>

  <dependencies>
  <!-- https://mvnrepository.com/artifact/com.google.auth/google-auth-library-credentials -->
<dependency>
    <groupId>com.google.auth</groupId>
    <artifactId>google-auth-library-credentials</artifactId>
    <version>0.21.1</version>
</dependency>

  <dependency>
    <groupId>com.google.apis</groupId>
    <artifactId>google-api-services-dataflow</artifactId>
    <version>v1b3-rev207-1.20.0</version>
</dependency>
    <dependency>
      <groupId>com.google.cloud.functions</groupId>
      <artifactId>functions-framework-api</artifactId>
      <version>1.0.1</version>
    </dependency>
         <dependency>
    <groupId>com.google.auth</groupId>
    <artifactId>google-auth-library-oauth2-http</artifactId>
    <version>0.21.1</version>
</dependency>
  </dependencies>

  <!-- Required for Java 11 functions in the inline editor -->
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <excludes>
            <exclude>.google/</exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

cloud function logs

enter image description here

I went through the below blogs (adding for reference) where they have triggered dataflow from cloud storage via cloud function. But the code has been written in either Node.js or python. But my cloud function must be written in java.

Triggering Dataflow pipeline via cloud functions in Node.js

https://dzone.com/articles/triggering-dataflow-pipelines-with-cloud-functions

Triggering dataflow pipeline via cloud functions using python

https://medium.com/google-cloud/how-to-kick-off-a-dataflow-pipeline-via-cloud-functions-696927975d4e

Any help on this is very much appreciated.

Writein answered 21/8, 2020 at 5:24 Comment(9)
Can you share the detail of the error in the logs?Derain
Did you try to update your dependencies? Especially this one: google-api-services-dataflow. The version that you use has been release 3 years ago (July 2017!!)Derain
@guillaumeblaquiere I added the error logs but not sure as it came as a warning.Writein
Isn't the error logged thrown by yourself? Why are you doing that? I suggest you update the google-api-services-dataflow, remove the thrown exception and check if it works.Dreibund
@Dreibund You are right, that is a thrown error (Also it is shown as Warning). I have removed that, updated but the classes don't seem to be available.Writein
@Dreibund I have removed the errored logs (from the post) as it looked misleading since it is explicitly thrown by me.Writein
If you have updated your code, update the question with the latest version. If you are encountering an error including the classes, provide the full stack trace so we can help you further.Dreibund
Also, update the google-api-services-dataflow library and provide the new results.Dreibund
The cloud function for runtime Java 11 was in BETA form when this question was posted, I think Google Cloud released Java 11 runtime for Cloud functions, and now I am able to trigger the dataflow. Thank you @DreibundWritein
U
5
RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation("gs://karthiksfirstbucket/temp1");

LaunchTemplateParameters launchTemplateParameters = new LaunchTemplateParameters();
launchTemplateParameters.setEnvironment(runtimeEnvironment);
launchTemplateParameters.setJobName("newJob" + (new Date()).getTime());

Map<String, String> params = new HashMap<String, String>();
params.put("inputFile", "gs://karthiksfirstbucket/sample.txt");
params.put("output", "gs://karthiksfirstbucket/count1");
launchTemplateParameters.setParameters(params);
writer.write("4");
       
Dataflow.Projects.Templates.Launch launch = dataflowService.projects().templates().launch(projectId, launchTemplateParameters);            
launch.setGcsPath("gs://dataflow-templates-us-central1/latest/Word_Count");
launch.execute();

The above code launches a template and executes the dataflow pipeline

  1. using application default credentials(Which can be changed to user cred or service cred)
  2. region is default region(Which can be changed).
  3. creates a job for every HTTP trigger(Trigger can be changed).

The complete code can be found below:

https://github.com/karthikeyan1127/Java_CloudFunction_DataFlow/blob/master/Hello.java

Umeko answered 4/9, 2020 at 4:50 Comment(1)
I am tying to call the dataflow using the cloudfunction which you shared. But am getting the below error. Exception in thread "main" java.lang.NoClassDefFoundError: com/google/auth/Credentials at java.base/java.lang.Class.getDeclaredConstructors0(Native Method) at java.base/java.lang.Class.privateGetDeclaredConstructors(Class.java:3137) at java.base/java.lang.Class.getConstructor0(Class.java:3342) at java.base/java.lang.Class.getConstructor(Class.java:2151) at com.google.cloud.functions.invoker.BackgroundFunctionExecutor.forClass(BackgroundFunctionExecutor.java:122) atJosephina
N
0

This is my solution using the new data flow dependencies

public class Example implements BackgroundFunction<Example.GCSEvent> {
    private static final Logger logger = Logger.getLogger(Example.class.getName());

    @Override
    public void accept(GCSEvent event, Context context) throws Exception {
        String filename = event.name;

        logger.info("Processing file: " + filename);
        logger.info("Bucket name" + event.bucket);

        String projectId = "cedar-router-268801";
        String region = "us-central1";
        String tempLocation = "gs://cedar-router-beam-poc/temp";
        String templateLocation = "gs://cedar-router-beam-poc/template/poc-template.json";

        logger.info("path" + String.format("gs://%s/%s", event.bucket, filename));
        String scenario = filename.substring(0, 3); //it comes TWO OR ONE

        logger.info("scneario " + scenario);

        Map<String, String> params = Map.of("sourceFile", String.format("%s/%s", event.bucket, filename),
                "scenario", scenario,
                "years", "2013,2014",
                "targetFile", "gs://cedar-router-beam-poc-kms/result/testfile");
        
        extractedJob(projectId, region, tempLocation, templateLocation, params);
    }

    private static void extractedJob(String projectId,
                                     String region,
                                     String tempLocation,
                                     String templateLocation,
                                     Map<String, String> params) throws Exception {

        HttpTransport httpTransport = GoogleApacheHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = GsonFactory.getDefaultInstance();
        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        HttpRequestInitializer httpRequestInitializer = new RetryHttpRequestInitializer(ImmutableList.of(404));
        ChainingHttpRequestInitializer chainingHttpRequestInitializer =
                new ChainingHttpRequestInitializer(new HttpCredentialsAdapter(credentials), httpRequestInitializer);

        Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, chainingHttpRequestInitializer)
                .setApplicationName("Dataflow from Cloud function")
                .build();

        FlexTemplateRuntimeEnvironment runtimeEnvironment = new FlexTemplateRuntimeEnvironment();
        runtimeEnvironment.setTempLocation(tempLocation);

        LaunchFlexTemplateParameter launchFlexTemplateParameter = new LaunchFlexTemplateParameter();
        launchFlexTemplateParameter.setEnvironment(runtimeEnvironment);
        String jobName = params.get("sourceFile").substring(34, 49).replace("_","");
        logger.info("job name" + jobName);
        launchFlexTemplateParameter.setJobName("job" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));

        launchFlexTemplateParameter.setContainerSpecGcsPath(templateLocation);
        launchFlexTemplateParameter.setParameters(params);

        LaunchFlexTemplateRequest launchFlexTemplateRequest = new LaunchFlexTemplateRequest();
        launchFlexTemplateRequest.setLaunchParameter(launchFlexTemplateParameter);


        Launch launch = dataflowService.projects()
                .locations()
                .flexTemplates()
                .launch(projectId, region, launchFlexTemplateRequest);

        launch.execute();
        logger.info("running job");
    }


    public static class GCSEvent {
        String bucket;
        String name;
        String metageneration;
    }
Just adapt it to your case 
Narbada answered 3/6, 2022 at 13:40 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.