How to invoke a cloud function from google cloud composer?
D

3

6

For a requirement I want to call/invoke a cloud function from inside a cloud composer pipeline but I cant find much info on it, I tried using SimpleHTTP airflow operator but I get this error:

[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in 
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in 
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in 
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None

Thanks in advance!!

Darken answered 10/9, 2021 at 11:55 Comment(0)
M
7

I faced the same issue as you, but I managed to figure it out by studying the Airflow 2.0 provider packages for Google and using a PythonOperator instead.

from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
import google.auth.transport.requests
from google.auth.transport.requests import AuthorizedSession

def invoke_cloud_function():

  url = "<your_cloud_function_url>" #the url is also the target audience. 
  request = google.auth.transport.requests.Request()  #this is a request for obtaining the the credentials
  id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(url, request=request) # If your cloud function url has query parameters, remove them before passing to the audience 

  resp = AuthorizedSession(id_token_credentials).request("GET", url=url) # the authorized session object is used to access the Cloud Function

  print(resp.status_code) # should return 200
  print(resp.content) # the body of the HTTP response

Thus, invoke the function as below:

    task = PythonOperator(task_id="invoke_cf", python_callable=invoke_cloud_function)

From my understanding, accessing an authenticated HTTP Cloud Function strictly requires a credential based on ID Tokens. Thus to obtain the required type of credentials, get_default_id_token_credentials() executes the Application Default Credentials(ADC) authorization flow, which is a process that obtains credentials from environment variables, known locations. or the Compute Engine metadata server. Composer should have the associated service account keyfile made avaliable via environment variables (probably GOOGLE_APPLICATION_CREDENTIALS).

Once you have the right type of credentials, you can use the AuthorizedSessions object to authenticate your requests to the cloud function.

Mashe answered 23/9, 2021 at 18:17 Comment(2)
Hi @Seng Cheong I tried both these methods and am still getting connection timeout error on airflow , so cloud it be a internal gcp problem related to how resources are deployed or could it be that I am missing some packages ??Darken
@SnehilSingh it is unlikely that your airflow environment lacks the packages needed to connect. Your error could stem from various causes e.g. VPN, Cloud Function configurationMashe
O
1

I think you are looking for: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/functions/index.html#airflow.providers.google.cloud.operators.functions.CloudFunctionInvokeFunctionOperator

Note that in order to use in 1.10 you need to have backport provider packages installed (but I believe they are installed by default) and version of the operator might be slightly different due to backport packages not being released for quite some time already.

In Airflow 2

Oculo answered 10/9, 2021 at 12:41 Comment(2)
Hi @JarekPotiuk, I had tried the airflow invoke function operator but it didnt work , thats why I tried simpleHTTP operator.Darken
It raises some bad request error every time i try to run it on airflow but using SimpleHttp operator I am able to invoke my cloud functions.Darken
D
1

Given a working Cloud Function in HTTP mode, the best and easy solution is to invoke it with SimpleHttpOperator.

If authentication is required by the Cloud Function, you need to generate an authentication token and insert it in the header:

import os
import json
import google.oauth2.id_token
import google.auth.transport.requests

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)

MY_TASK_NAME = SimpleHttpOperator(
        task_id= 'MY_TASK_NAME',
        method='POST',
        http_conn_id='my_connection_id',
        endpoint='MyFunctionName',
        headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
        data=json.dumps({"key": "value"}),  # possible request parameters
        # dag=dag
    )

If authentication is NOT required to trigger the Cloud Function:

import os
import json

MY_TASK_NAME = SimpleHttpOperator(
        task_id= 'MY_TASK_NAME',
        method='POST',
        http_conn_id='my_connection_id',
        endpoint='MyFunctionName',
        headers={"Content-Type": "application/json"},
        data=json.dumps({"key": "value"}),  # possible request parameters
        # dag=dag
    )

In both cases remember to set my_connection_id in the Airflow Connections menu (Admin --> Connections). In the picture below is an example:

enter image description here

You can also trigger the Cloud Function using the requests python module (as before w/ or w/o authentication):

import os
import json
import requests
import google.oauth2.id_token
import google.auth.transport.requests

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my-service-account.json'
request = google.auth.transport.requests.Request()
audience = 'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName'
TOKEN = google.oauth2.id_token.fetch_id_token(request, audience)

r = requests.post(
    'https://mylocation-myprojectname.cloudfunctions.net/MyFunctionName', 
    headers={'Authorization': f"Bearer {TOKEN}", "Content-Type": "application/json"},
    data=json.dumps({"key": "value"})  # possible request parameters
)
r.status_code, r.reason
Dose answered 30/11, 2021 at 19:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.