Skip to content Skip to sidebar Skip to footer

How Do I Download A Large List Of Urls In Parallel In Pyspark?

I have an RDD containing 10000 urls to be fetched. list = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com', 'http://google.com', 'http://twitter.com'] urls = sc.parallelize(

Solution 1:

If you're using concurrent.futures, you don't need asyncio at all (it will bring you no benefits since you are running in multiple threads anyway). You can use concurrent.futures.wait() to wait for multiple futures in parallel.

I can't test your data, but it should work with code like this:

import concurrent.futures, requests

defget_one(url):
    resp = requests.get(url)
    resp.raise_for_status()
    return resp.text

defget_all():
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(get_one, url)
                   for url in urls.toLocalIterator()]
    # the end of the "with" block will automatically wait# for all of the executor's tasks to completefor fut in futures:
        if fut.exception() isnotNone:
            print('{}: {}'.format(fut.exception(), 'ERR')
        else:
            print('{}: {}'.format(fut.result(), 'OK')

To do the same thing with asyncio, you should use aiohttp instead.

Solution 2:

You can try pyspark-asyncactions

The naming convention for the patched methods is methodNameAsync, for example:

RDD.count ⇒ RDD.countAsync
DataFrame.take ⇒ RDD.takeAsync
DataFrameWriter.save ⇒ DataFrameWriter.saveAsync

Usage To patch existing classes just import the package:

>>>import asyncactions>>>from pyspark.sql import SparkSession>>>>>>spark = SparkSession.builder.getOrCreate()
All *Async methods return concurrent.futures.Future:
>>>rdd = spark.sparkContext.range(100)>>>f = rdd.countAsync()>>>f
<Future at ... state=running>
>>>type(f)
concurrent.futures._base.Future
>>>f.add_done_callback(lambda f: print(f.result()))
100

Post a Comment for "How Do I Download A Large List Of Urls In Parallel In Pyspark?"