Calling pyspark function asynchronously with concurrent.futures
Asked Answered
F

2

7

I am trying to call python functions which use pyspark rdd objects methods and are time-consuming which blocks my application. I need to write it in an async fashion so that my app doesn't get blocked. Here is a miniature version of the actual thing I want to do.

from concurrent.futures import Future
from pyspark import SparkContext

sc = SparkContext()

def add(a, b):
    f = Future()
    c = a + b
    d = a*b
    t = (c,d)
    rdd = sc.parallelize([t])
    f.set_result(rdd)
    # return rdd

if __name__ == '__main__':

    f1 = add(90,8)
    f2 = add(8, 89)

    while (not f1.done()) and (not f2.done()):
        pass

    print(f1.result())
    print(f2.result())

I know the above code won't work straight away. How can I modify it, so that it will work ?

Frohne answered 17/1, 2018 at 13:18 Comment(0)
J
3

Use the Threading module. I just finished a similar project and it worked like a charm.

import threading

new_thread = threading.Thread(
    target = <function here>,
    args = (<function args>),
    name = <thread name>,
)
new_thread.start()

Above is the core functionality. Below is a more elaborate example of queuing up a job, adding that job (thread) to a line to wait its turn (with the thread.join() method), and returning a response with the number of jobs (threads) in line for processing.

current_jobs = []
for t in threading.enumerate():
    if t._Thread__name in (<thread name>, <thread name>):
        if t.is_alive():
            current_jobs.append(t)

new_thread = threading.Thread(
    target = <function here>,
    args = (<function args here>, current_jobs),
    name = <thread name here>,
)
new_thread.start()

# Create message.
job_id = uuid.uuid4().hex
job_message = "Job Id: " + job_id

# Check current job count.
job_count = len(current_jobs)
if job_count > 0:
    # extend message if any jobs found.
    job_message += "\nThere are " + str(job_count) + " ahead of you, so please be patient."
return app.make_response(job_message), 200

And pass current_jobs as an argument to your function, with this code executing at the beginning:

for j in current_jobs:
    j.join()

Important to note that your function should contain the entire logic of creating your SparkContext, the lazy work on your RDDs/dataframes, and any actual work you want your cluster to do (.collect(), submitting to a db, etc.), as this thread is asynchronous and will have its own scope that would be complicated to transfer back to the main thread.

Jemma answered 17/1, 2018 at 22:7 Comment(0)
E
2

I think you should just return f in your function :

def add(a, b):
    f = Future()
    c = a + b
    d = a*b
    t = (c,d)
    rdd = sc.parallelize([t])
    f.set_result(rdd)
    return f

But dont forget your rdd is lazy. With no action, it should not consume that much time.

Escharotic answered 17/1, 2018 at 16:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.