Download HTML pages concurrently using the Async CTP
Asked Answered
W

1

3

Attempting to write a HTML crawler using the Async CTP I have gotten stuck as to how to write a recursion free method for accomplishing this.

This is the code I have so far.

private readonly ConcurrentStack<LinkItem> _LinkStack;
private readonly Int32 _MaxStackSize;
private readonly WebClient client = new WebClient();

Func<string, string, Task<List<LinkItem>>> DownloadFromLink = async (BaseURL, uri) => 
{
    string html = await client.DownloadStringTaskAsync(uri);
    return LinkFinder.Find(html, BaseURL);
};

Action<LinkItem> DownloadAndPush = async (o) => 
{
    List<LinkItem> result = await DownloadFromLink(o.BaseURL, o.Href);
    if (this._LinkStack.Count() + result.Count <= this._MaxStackSize)
    {
        this._LinkStack.PushRange(result.ToArray());
        o.Processed = true;
    }  
};

Parallel.ForEach(this._LinkStack, (o) => 
{
    DownloadAndPush(o);
});

But obviously this doesn't work as I would hope because at the time that Parallel.ForEach executes the first (and only iteration) I only have only 1 item. The simplest approach I can think of to make the ForEach recursive but I can't (I don't think) do this as I would quickly run out of stack space.

Could anyone please guide me as to how I can restructure this code, to create what I would describe as a recursive continuation that adds items until either the MaxStackSize is reached or the system runs out of memory?

Willey answered 13/2, 2012 at 11:28 Comment(1)
+1. He who controls the recursion, controls the universe!Lanilaniard
L
10

I think the best way to do something like this using C# 5/.Net 4.5 is to use TPL Dataflow. There even is a walkthrough on how to implement web crawler using it.

Basically, you create one "block" that takes care of downloading one URL and getting the link from it:

var cts = new CancellationTokenSource();

Func<LinkItem, Task<IEnumerable<LinkItem>>> downloadFromLink =
    async link =>
            {
                // WebClient is not guaranteed to be thread-safe,
                // so we shouldn't use one shared instance
                var client = new WebClient();
                string html = await client.DownloadStringTaskAsync(link.Href);

                return LinkFinder.Find(html, link.BaseURL);
            };

var linkFinderBlock = new TransformManyBlock<LinkItem, LinkItem>(
    downloadFromLink,
    new ExecutionDataflowBlockOptions
    { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

You can set MaxDegreeOfParallelism to any value you want. It says at most how many URLs can be downloaded concurrently. If you don't want to limit it at all, you can set it to DataflowBlockOptions.Unbounded.

Then you create one block that processes all the downloaded links somehow, like storing them all in a list. It can also decide when to cancel downloading:

var links = new List<LinkItem>();

var storeBlock = new ActionBlock<LinkItem>(
    linkItem =>
    {
        links.Add(linkItem);
        if (links.Count == maxSize)
            cts.Cancel();
    });

Since we didn't set MaxDegreeOfParallelism, it defaults to 1. That means using collection that is not thread-safe should be okay here.

We create one more block: it will take a link from linkFinderBlock, and pass it both to storeBlock and back to linkFinderBlock.

var broadcastBlock = new BroadcastBlock<LinkItem>(li => li);

The lambda in its constructor is a "cloning function". You can use it to create a clone of the item if you want to, but it shouldn't be necessary here, since we don't modify the LinkItem after creation.

Now we can connect the blocks together:

linkFinderBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(storeBlock);
broadcastBlock.LinkTo(linkFinderBlock);

Then we can start processing by giving the first item to linkFinderBlock (or broadcastBlock, if you want to also send it to storeBlock):

linkFinderBlock.Post(firstItem);

And finally wait until the processing is complete:

try
{
    linkFinderBlock.Completion.Wait();
}
catch (AggregateException ex)
{
    if (!(ex.InnerException is TaskCanceledException))
        throw;
}
Lombroso answered 13/2, 2012 at 14:35 Comment(3)
Wow! Thank you for the brilliant explanation. Could you just confirm one thing? If we set MaxDegreeOfParallelism to a number > 1 does this mean I need change the collection type to something like the ConcurrentStack for it to be thread safe?Willey
Do you mean the collection in storeBlock? And where do you set MaxDegreeOfParallelism? If you set MDOP of storeBlock to > 1, then, yes, you would need to use some thread-safe collection there (or use locks). But if you set MDOP of some other block to > 1, it doesn't affect the parallelism of storeBlock, so you don't need to consider thread-safety.Lombroso
Boy this is going to make me upgrade to 2012! +1Lanilaniard

© 2022 - 2024 — McMap. All rights reserved.