Triggering Databricks job from Airflow without starting new cluster
Asked Answered
R

4

9

I am using airflow to trigger jobs on databricks. I have many DAGs running databricks jobs and I whish to have to use only one cluster instead of many, since to my understanding this will reduce the costs these task will generate.

Using DatabricksSubmitRunOperatorthere are two ways to run a job on databricks. Either using a running cluster calling it by id

'existing_cluster_id' : '1234-567890-word123',

or starting a new cluster

'new_cluster': {
    'spark_version': '2.1.0-db3-scala2.11',
    'num_workers': 2
  },

Now I would like to try to avoid to start a new cluster for each task, however the cluster shuts down during downtime hence it will not be available trough it's id anymore and I will get an error, so the only option in my view is a new cluster.

1) Is there a way to have a cluster being callable by id even when it is down?

2) Do people simply keep the clusters alive?

3) Or am I completely wrong and starting clusters for each task won't generate more costs?

4) Is there something I missed completely?

Robichaux answered 6/2, 2019 at 20:1 Comment(3)
Step1- click on cluster and find below details in URL. Step 2: Copy ClusterName from URL as define below. eastus.azuredatabricks.net? o=WorkSpaceID#/setting/clusters/<ClusterName>/configuration notebook_task_params = { 'existing_cluster_id': "<ClusterName>", 'notebook_task': { 'notebook_path': '/Users/[email protected]/notebookName', }, }Dryfoos
I am not bothered by this problem anymore but I still wonder how you would do it. It seems like there are steps missing, also I was working in AWS not azure if that makes a difference.Robichaux
I my case i just pass these steps and working fine in azure.Not tried in AWS I'll check and update here.Dryfoos
A
1

Updates based on @YannickSSE's comment response
I don't use databricks; Can you start a new cluster by the same id as the cluster you may or may not expect is running and have it be a no-op in the case that it is running? Maybe not, or you probably wouldn't be asking this. Response: no when starting a new cluster you cannot give an id.

Could you write a python or bash operator which tests for the existence of the cluster? (Response: This would be a test job submission… not the best approach.) If it finds it and succeeds the downstream task would trigger your job with the existing cluster id, but if it doesn't another downstream task could use the trigger_rule all_failed to do the same task but with a new cluster. Then both those task DatabricksSubmitRunOperators could have one downstream task with the trigger_rule one_success. (Response: Or use a branching operator to determine the operator executed.)

It might not be ideal because I imagine then that your cluster id is changing from time to time causing you to have to keep up. … Is the cluster part of the databricks hook's connection for that operator, and something that can be updated? Maybe you want to specify it in the tasks that need it as {{ var.value.<identifying>_cluster_id }} and keep it updated as an airflow variable. (Response: the cluster id is not in the hook, so the variable or DAG file would have to be updated whenever it changes.)

Adila answered 7/2, 2019 at 6:32 Comment(2)
I only have the two options, running it on a running cluster given the id or start a new cluster (I cannot specify it's id). I could od some branching and start a new cluster but I would need to test to see what happens when the cluster is starting and another dag tries to send a job to this cluster. The id is sadly not a part of the hook databricks uses but instead I have to give it as an argument to the databricks operator. However I can create a variable and update this one. If no better answer comes I will do that. ThanksRobichaux
@YannickSSE thanks for clarifying which is the case for my speculation. I am thinking that either you need to use a variable OR make a service which Airflow can have an operator check against (as simple as a rest web request, or a DB query like SELECT cluster_id FROM active_clusters WHERE intended_use = 'This_DAG_id';) and if it comes back with a value, push it to xcom and use {{ ti.xcom_pull('taskname_for_getting_which_cluster') }} as the cluster_id templated string. Not sure how you'd start a cluster without a job and then find out the ID put it in xcom and POST/insert in the service.Adila
R
1

It seems Databricks has added an option recently to reuse a job cluster within a job, sharing it between tasks.

https://databricks.com/blog/2022/02/04/saving-time-and-costs-with-cluster-reuse-in-databricks-jobs.html

Until now, each task had its own cluster to accommodate for the different types of workloads. While this flexibility allows for fine-grained configuration, it can also introduce a time and cost overhead for cluster startup or underutilization during parallel tasks.

In order to maintain this flexibility, but further improve utilization, we are excited to announce cluster reuse. By sharing job clusters over multiple tasks customers can reduce the time a job takes, reduce costs by eliminating overhead and increase cluster utilization with parallel tasks.

This seems to be available in the new API as well. https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate

job_clusters Array of objects (JobCluster) <= 100 items

A list of job cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in task settings.

In order to fit your use case you could start a new cluster with your job, share it between your tasks, and it will automatically shut down at the end.

I still don't fully understand how we might keep a job cluster hot all the time if we want to have jobs start with no latency. I also don't think it's possible to share these clusters between jobs.

For now this information should provide a decent lead.

Rincon answered 18/2, 2022 at 5:45 Comment(3)
You can use instnace pools to get faster startup of jobsBrady
An instance pool, especially one with hot nodes, can reduce the startup times. In my case that brought the startup latency from 3 minutes to 1 minute. Unfortunately I had an ideal of less than 20 seconds in mind, which I haven't been able to achieve. I think this is mostly around the installation of libraries on the hot nodes.Rincon
yes, if you need libraries, then you can create a Docker image with them, and preload it to the nodes together with databricks runtimeBrady
H
0

In fact when you want to execute a notebook via airflow, you have to specify the characterestics of your cluster.

databricks will consider your notebook as a new job and make it on the cluster you created. But when the execution is finished the cluster created will be deleted autormatically.

To verify this: when job are running on airflow ==> go to see logs => It gives you a link => the link forward you to databricks : There you click on View cluster, so you will see the execution on a new created cluster called for example job-1310-run-980

Hyponitrite answered 27/1, 2022 at 21:17 Comment(1)
Also if you want to delete these job cluster you can do this: Store tokens in a .netrc file and use them in curl Create a .netrc file with machine, login, and password properties: ` machine abc-d1e2345f-a6b2.cloud.databricks.com login token password dapi1234567890ab1cde2f3ab456c7d89efa ` ` curl --netrc -X POST dbc-a1b2345c-d6e7.cloud.databricks.com/api/2.0/clusters/delete --data '{ "cluster_id": "1234-567890-frays123" }'`Hyponitrite
J
0

BLUF: You generally should create new clusters for scheduled workflows like those orchestrated from Airflow.

The DatabrickSubmitRunOperator uses the jobs/runs/submit endpoint:

https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit

Under tasks > existing_cluster_id (emphasis mine):

If existing_cluster_id, the ID of an existing cluster that is used for all runs of this task. When running tasks on an existing cluster, you may need to manually restart the cluster if it stops responding. We suggest running jobs on new clusters for greater reliability.

When you create a new cluster configured in the airflow operator, it creates a job cluster, which is terminated at the end of the job. Job clusters are much cheaper than all-purpose clusters, which persist indefinitely.

https://www.databricks.com/product/aws-pricing

  1. Is there a way to have a cluster being callable by id even when it is down? No, the submit job run will fail if the all-purpose cluster is terminated, or if you provide a job cluster id.

  2. Do people simply keep the clusters alive? All-purpose clusters, which are created for analysis, collaboration, exploratory work, only terminate manually.

  3. Or am I completely wrong and starting clusters for each task won't generate more costs? Job clusters cost <35% the cost of all-purpose clusters per DBU.

Justifier answered 17/1, 2023 at 3:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.