Asynchronously and parallelly downloading files
Asked Answered
A

2

13

EDIT

I've changed the title of the question to reflect the issue I had but also an answer on how to achieve this easily.


I am trying to make the 2nd method to return Task<TResult> instead of Task as in 1st method but I am getting a cascade of errors as a consequence of trying to fix it.

  • I added return before await body(partition.Current);
  • In turn it asks me to add a return statement below so I added return null below
  • But now the select statement complains that it cannot infer the type argument from the query
  • I change Task.Run to Task.Run<TResult> but without success.

How can I fix it ?

The first method comes from http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx, the second method is the overload that I'm trying to create.

public static class Extensions
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current);
            }));
    }

    public static Task ForEachAsync<T, TResult>(this IEnumerable<T> source, int dop, Func<T, Task<TResult>> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current);
            }));
    }
}

Usage example :

With this method I'd like to download multiple files in parallel and asynchronously :

private async void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
    Artist artist = await GetArtist();
    IEnumerable<string> enumerable = artist.Reviews.Select(s => s.ImageUrl);
    string[] downloadFile = await DownloadFiles(enumerable);
}

public static async Task<string[]> DownloadFiles(IEnumerable<string> enumerable)
{
    if (enumerable == null) throw new ArgumentNullException("enumerable");
    await enumerable.ForEachAsync(5, s => DownloadFile(s));
    // Incomplete, the above statement is void and can't be returned
}

public static async Task<string> DownloadFile(string address)
{
    /* Download a file from specified address, 
        * return destination file name on success or null on failure */

    if (address == null)
    {
        return null;
    }

    Uri result;
    if (!Uri.TryCreate(address, UriKind.Absolute, out result))
    {
        Debug.WriteLine(string.Format("Couldn't create URI from specified address: {0}", address));
        return null;
    }

    try
    {
        using (var client = new WebClient())
        {
            string fileName = Path.GetTempFileName();
            await client.DownloadFileTaskAsync(address, fileName);
            Debug.WriteLine(string.Format("Downloaded file saved to: {0} ({1})", fileName, address));
            return fileName;
        }
    }
    catch (WebException webException)
    {
        Debug.WriteLine(string.Format("Couldn't download file from specified address: {0}", webException.Message));
        return null;
    }
}
Archiplasm answered 4/10, 2013 at 19:28 Comment(9)
It's not at all clear what you'd expect the result to be. You're passing in a whole sequence of T values, and executing the same function on both of them - what single result would you expect to get out of the Task<TResult> returned?Running
I'd like to get a Task<string> in that case, I've added an example on my question.Archiplasm
"With this method I'd like to download multiple files in parallel and asynchronously" : Parallel.Foreach isn't enough?Propertied
@Aybe, would you want it to be an Task<IEnumerable<string>> in your case, or what string would you return if you really do want Task<string>?Reams
@Propertied I have seen it but couldn't understand it ... particularly on how to fetch the results from it :)Archiplasm
@MattSmith I was expecting it to return Task<string>, the DownloadFiles method would use that ForEachSync overload which in turn would call DownloadFile for each of the item in the enumerable.Archiplasm
@Aybe I think you still don't understand. Imagine you're downloading two pages, one containing foo and the other bar. If your ForEachAsync() was to return Task<string>, what string do you want it to contain? Given your code, it would make much more sense if it returned Task<string[]>.Rameau
Oh yes absolutely, I misunderstand what he meant.Archiplasm
Related: ForEachAsync with ResultMorphophonemics
A
31

I solved it and posting it here, might help anyone having the same issue.

My initial need was a small helper that would quickly download images but also just drop the connection if server does not respond quickly, all this in parallel and asynchronously.

This helper will return you a tuple that contains the remote path, the local path and the exception if one occurred; so quite useful as it's always good to know why faulty downloads have faulted. I think I forgot none of the situations that can occur for a download but you're welcome to comment it.

  • You specify a list of urls to download
  • You can specify a local file name where it will be saved, if not one will be generated for you
  • Optionally a duration for cancelling a download (handy for slow or unreachable servers)

You can just use DownloadFileTaskAsync itself or use the ForEachAsync helper for parallel and asynchronous downloads.

Code with an example on how to use it :

private async void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
    IEnumerable<string> enumerable = your urls here;
    var results = new List<Tuple<string, string, Exception>>();
    await enumerable.ForEachAsync(s => DownloadFileTaskAsync(s, null, 1000), (url, t) => results.Add(t));
}

/// <summary>
///     Downloads a file from a specified Internet address.
/// </summary>
/// <param name="remotePath">Internet address of the file to download.</param>
/// <param name="localPath">
///     Local file name where to store the content of the download, if null a temporary file name will
///     be generated.
/// </param>
/// <param name="timeOut">Duration in miliseconds before cancelling the  operation.</param>
/// <returns>A tuple containing the remote path, the local path and an exception if one occurred.</returns>
private static async Task<Tuple<string, string, Exception>> DownloadFileTaskAsync(string remotePath,
    string localPath = null, int timeOut = 3000)
{
    try
    {
        if (remotePath == null)
        {
            Debug.WriteLine("DownloadFileTaskAsync (null remote path): skipping");
            throw new ArgumentNullException("remotePath");
        }

        if (localPath == null)
        {
            Debug.WriteLine(
                string.Format(
                    "DownloadFileTaskAsync (null local path): generating a temporary file name for {0}",
                    remotePath));
            localPath = Path.GetTempFileName();
        }

        using (var client = new WebClient())
        {
            TimerCallback timerCallback = c =>
            {
                var webClient = (WebClient) c;
                if (!webClient.IsBusy) return;
                webClient.CancelAsync();
                Debug.WriteLine(string.Format("DownloadFileTaskAsync (time out due): {0}", remotePath));
            };
            using (var timer = new Timer(timerCallback, client, timeOut, Timeout.Infinite))
            {
                await client.DownloadFileTaskAsync(remotePath, localPath);
            }
            Debug.WriteLine(string.Format("DownloadFileTaskAsync (downloaded): {0}", remotePath));
            return new Tuple<string, string, Exception>(remotePath, localPath, null);
        }
    }
    catch (Exception ex)
    {
        return new Tuple<string, string, Exception>(remotePath, null, ex);
    }
}

public static class Extensions
{
    public static Task ForEachAsync<TSource, TResult>(
        this IEnumerable<TSource> source,
        Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor)
    {
        var oneAtATime = new SemaphoreSlim(5, 10);
        return Task.WhenAll(
            from item in source
            select ProcessAsync(item, taskSelector, resultProcessor, oneAtATime));
    }

    private static async Task ProcessAsync<TSource, TResult>(
        TSource item,
        Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor,
        SemaphoreSlim oneAtATime)
    {
        TResult result = await taskSelector(item);
        await oneAtATime.WaitAsync();
        try
        {
            resultProcessor(item, result);
        }
        finally
        {
            oneAtATime.Release();
        }
    }
}

I haven't changed the signature of ForEachAsync to choose the level of parallelism, I'll let you adjust it as you wish.

Output example :

DownloadFileTaskAsync (null local path): generating a temporary file name for http://cache.thephoenix.com/secure/uploadedImages/The_Phoenix/Music/CD_Review/main_OTR_Britney480.jpg
DownloadFileTaskAsync (null local path): generating a temporary file name for http://ssimg.soundspike.com/artists/britneyspears_femmefatale_cd.jpg
DownloadFileTaskAsync (null local path): generating a temporary file name for http://a323.yahoofs.com/ymg/albumreviewsuk__1/albumreviewsuk-526650850-1301400550.jpg?ymm_1xEDE5bu0tMi
DownloadFileTaskAsync (null remote path): skipping
DownloadFileTaskAsync (time out due): http://hangout.altsounds.com/geek/gars/images/3/9/8/5/2375.jpg
DownloadFileTaskAsync (time out due): http://www.beat.com.au/sites/default/files/imagecache/630_315sr/images/article/header/2011/april/britney-spears-femme-fatale.jpg
DownloadFileTaskAsync (time out due): http://cache.thephoenix.com/secure/uploadedImages/The_Phoenix/Music/CD_Review/main_OTR_Britney480.jpg
DownloadFileTaskAsync (downloaded): http://newblog.thecmuwebsite.com/wp-content/uploads/2009/12/britneyspears1.jpg
DownloadFileTaskAsync (downloaded): http://newblog.thecmuwebsite.com/wp-content/uploads/2009/12/britneyspears1.jpg
DownloadFileTaskAsync (downloaded): http://static.guim.co.uk/sys-images/Music/Pix/site_furniture/2011/3/22/1300816812640/Femme-Fatale.jpg
DownloadFileTaskAsync (downloaded): http://www.sputnikmusic.com/images/albums/72328.jpg

What used to take up to 1 minute now barely takes 10 seconds for the same result :)

And big thanks to the author of these 2 posts :

http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx

http://blogs.msdn.com/b/pfxteam/archive/2012/03/04/10277325.aspx

Archiplasm answered 5/10, 2013 at 18:46 Comment(3)
Great extension methods!Tiny
Shouldn't the await oneAtATime.WaitAsync(); be written before the TResult result = await taskSelector(item);? Otherwise you'd be just running immedietly the task for each item of your "source" and basically disregard the semaphore limitation (running only x tasks at a time)?Scribe
@Scribe is absolutely correct. The code above, as written, will actually starts as many tasks as the TPL without the semaphore, and will instead only allow the resultProcessor to have a max DOP. If you are cut and pasting the above code, move the .WaitAsync() above the execution of the taskselector. Additionally, without calling .Release(n) on the semaphore, only 5 (not 10) concurrent tasks will ever be running. I would recommend creating the semaphore as ... = new SemaphoneSlim(10,10)Priestess
B
-2

For people coming here more recently like me, I just found that a Parallel.ForEachAsync was implemented in .NET 6: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0.

Its usage is like you would expect, and it has the advantage of allowing to specify for the degree of parallelism. For example:

var someInputData = new []
{
    "someData1",
    "someData2",
    "someData3"
};

ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(someInputData, parallelOptions, 
    async (input, cancellationToken) => 
    {
       // Some async Func. 
       // The Func can make use of:
       // - the `input` variable, which will contain the element in the `someInputData` list;
       // - the `cancellationToken` variable, usable to cancel the async operation.
    });

See also https://www.hanselman.com/blog/parallelforeachasync-in-net-6.

Bigoted answered 10/4, 2023 at 12:6 Comment(2)
My understanding is that the OP wants to modify a ForEachAsync method that they found in the internet, so that it propagates the results of the asynchronous operations. Your answer doesn't address this requirement.Morphophonemics
Alelom for my voting I took into consideration that you've posted an identical answer in a different question. Your answer is on-topic in the other question, so I upvoted that answer and downvoted this answer. It's unfortunate that the other answer has been deleted by a moderator. I might change my vote if you edit this answer in a way that it becomes on-topic here as well. FYI I've also downvoted this question and the other existing answer, because IMHO it's a confusing question with a bad self-answer.Morphophonemics

© 2022 - 2024 — McMap. All rights reserved.