Scott Hanselman

Back to (Parallel) Basics: Don't Block Your Threads, Make Async I/O Work For You

November 15, '10 Comments [11] Posted in Back to Basics | Learning .NET | Programming | Source Code
Sponsored By

Stephen Toub is one of my favorite folks at Microsoft. I've asked him questions before, sometimes for myself, sometimes on your behalf, Dear Reader, and I've always received thoughtful and well reasoned answered. Because I believe strongly in Jon Udell's "Conserve Your Keystrokes" philosophy, I always try to get great information out to the community, especially when it's been emailed. Remember, when you slap the keyboard and write an epic email to just a few people, there's millions of people out there that miss out. Email less, blog more. More on this in a moment.

TIP: If you're interested in Patterns for Parallel Programming, run, don't walk, and download the FREE and extensive eBook called, yes, you guessed it, Patterns for Parallel Programming: Understanding and Applying Parallel Patterns with the .NET Framework 4. Yes, that title is long, but it feels shorter if you process it in parallel. Seriously, it's free and there's a C# and Visual Basic version. It's brilliant.
Now, if you're REALLY interested in the topic, go get the book Parallel Programming with Microsoft .NET by Stephen Toub, Ade Miller, Colin Campbell, and Ralph Johnson. The complete book as HTML is also hosted here.

I recently noticed a blog post from my friend Steve Smith where he shares some quick sample code to "Verify a List of URLs in C# Asynchronously." As I know Steve wouldn't mind me digging into this, I did. I started by asking Stephen Toub in the Parallel Computing group at Microsoft.

Steve Smith wanted to verify a list of URLs for existence. This is the basic synchronous code:

private static bool RemoteFileExists(string url)
{
try
{
var request = WebRequest.Create(url) as HttpWebRequest;
request.Method = "HEAD";
var response = request.GetResponse() as HttpWebResponse;
return (response.StatusCode == HttpStatusCode.OK);
}
catch
{
return false;
}
}

Then Steve changed the code to be Parallel using the new Parallel features of .NET 4 as Stephen Toub helped me explain in "Back to Parallel Basics" in April.

var linkList = GetLinks();

Action<int> updateLink = i =>
{
UpdateLinkStatus(linkList[i]); //fetch URL and update its status in a shared list
};
Parallel.For(0, linkList.Count, updateLink);

Using Parallel.For is a really great way to introduce some basic naive parallelism into your applications.

I'm no expert in parallelism (I've read a great whitepaper...) but I asked Stephen Toub if this was the best and recommended way to solve this problem. Stephen responded from a plane using (his words) "email compiled and tested" examples. With his permission, I've included a derivation of his response here in this blog post for my own, and possibly your, edification.

From Stephen:

First, it looked like the author was proposing using a parallel loop to handle this.  That's ok, and certainly easy, but that’s the kind of thing you’d only really want to do in a client application and not a server application.  The issue here is that, while easy, it blocks threads; for a client application, having a few more threads that are blocked typically isn’t a big deal; for a server app, though, if for example you were doing this in response to an incoming ASP.NET or WCF service request, you'd be blocking several threads per request, which will greatly hinder scalability.  Still, to get up and running quickly, and if the extra few threads isn’t problematic, this is a fine way to go. 

Assuming you want you "fan out" quickly and easily and it's OK to block a few threads, you can either use a parallel loop, tasks directly, or Stephen's personal favorite, a PLINQ query, e.g. if I have a function "bool ValidateUrl(string url);", I can use PLINQ to process up to N at a time:

bool [] results = (from url in urls.AsParallel() select ValidateUrl(url)).ToArray();

In this example, PLINQ will use up to N threads from the ThreadPool, where N defaults to Environment.ProcessorCount, but you can tack on .WithDegreeOfParallelism(N) after the AsParallel() and provide your own N value.

If Steve was doing this in a console app, which is likely, then as Stephen points out, that's no big deal. You've usually got threads to spare on the client. On the server side, however, you want to avoid blocking threads as much as you can.

A better solution from a scalability perspective, says Stephen, is to take advantage of asynchronous I/O.  When you're calling out across the network, there's no reason (other than convenience) to blocks threads while waiting for the response to come back. Unfortunately, in the past it's been difficult to do this kind of aggregation of async operations.  We' need to rewrite our ValidateUrl method to be something more like:

public void ValidateUrlAsync(string url, Action<string,bool> callback);

where the method returns immediately and later calls back through the provided callback to alert whether a given URL is valid or not.  We'd then change our usage of this to be more like this. Notice the use of using System.Collections.Concurrent.ConcurrentQueue representing a thread-safe first in-first out (FIFO) collection, and CountdownEvent, that represents a synchronization primitive that is signaled when its count reaches zero.

using(var ce = new CountdownEvent(urls.Length))

{
var results = new ConcurrentQueue<Tuple<string,bool>>();

Action callback = (url,valid) =>
{
results.Enqueue(Tuple.Create(url,valid));
ce.Signal();
};

foreach(var url in urls) ValidateUrlAsync(url, callback);

ce.Wait();
}

Assuming ValidateUrlAsync is written to use async, e.g. (you'd really want the following to do better error-handling, but again, this is email-compiled):

public void ValidateUrlAsync(string url, Action<string,bool> callback)
{
var request = (HttpWebRequest)WebRequest.Create(url);
try
{
request.BeginGetResponse(iar =>
{
HttpWebResponse response = null;
try
{
response = (HttpWebResponse)request.EndGetResponse(iar);
callback(url, response.StatusCode == HttpStatusCode.OK);
}
catch { callback(url, false); }
finally { if (response != null) response.Close(); }
}, null);
}
catch { callback(url, false); }
}

This example would then this would end up only blocking the main thread launching all of the requests and then blocking waiting for all of the responses, rather than blocking one thread per request.  With a slight change, we could also make the launcher async, for example:

public static void ValidateUrlsAsync(string [] urls, Action<IEnumerable<Tuple<string,bool>> callback)
{
var ce = new CountdownEvent(urls.Length);
var results = new ConcurrentQueue<Tuple<string,bool>>();
Action callback = (url,valid) =>
{
results.Enqueue(Tuple.Create(url,valid));
if (ce.Signal()) callback(results);
};
foreach(var url in urls) ValidateUrlAsync(url, callback);
}

Still, this is all really complicated, and much more difficult than the original one-liner using PLINQ. 

This is where Tasks and the new Async CTP come in really handy. Imagine that instead of

void ValidateUrlAsync(string url, Action<bool> callback);

we instead had

Task<bool> ValidateUrlAsync(string url);

The Task<bool> being returned is much more composable, and represents the result (both the successful completion case and the exceptional case) of the async operation. 

BETA NOTE: It's not possible to have both ASP.NET MVC 3 and the Async CTP installed at the same time. This is a beta conflict thing, it'll be fixed, I'm sure.

If we had such an operation, and if we had a Task.WhenAll method that took any number of tasks and returned a task to represent them all, then we can easily await all of the results, e.g.

bool [] results = await Task.WhenAll(from url in urls select ValidateUrlAsync(url));

Nice and simple, entirely asynchronous, no blocked threads, etc. 

(Note that in the Async CTP, Task.WhenAll is currently TaskEx.WhenAll, because since it was an out-of-band CTP we couldn't add the static WhenAll method onto Task like we wanted to.)

With the Async CTP and the await keyword, it's also much easier to implement the ValidateUrlAsync method, and to do so with complete support for exception handling (which I didn't do in my previous example, i.e. if something fails, it doesn't communicate why).

public async Task<bool> ValidateUrlAsync(string url)
{
using(var response = (HttpWebResponse)await WebRequest.Create(url).GetResponseAsync())
return response.StatusCode == HttpStatusCode.Ok;
}

Even without the Async CTP, though, it's still possible to implement ValidateUrlAsync with this signature.

Notice the use of System.Threading.Tasks.TaskCompletionSource. From MSDN:

In many scenarios, it is useful to enable a Task (Of(TResult)) to represent an external asynchronous operation. TaskCompletionSource (Of( TResult)) is provided for this purpose. It enables the creation of a task that can be handed out to consumers, and those consumers can use the members of the task as they would any other.

public Task<bool> ValidateUrlAsync(string url)
{
var tcs = new TaskCompletionSource<bool>();
var request = (HttpWebRequest)WebRequest.Create(url);
try
{
request.BeginGetResponse(iar =>
{
HttpWebResponse response = null;
try
{
response = (HttpWebResponse)request.EndGetResponse(iar);
tcs.SetResult(response.StatusCode == HttpStatusCode.OK);
}
catch(Exception exc) { tcs.SetException(exc); }
finally { if (response != null) response.Close(); }
}, null);
}
catch(Exception exc) { tcs.SetException(exc); }
return tsc.Task;

}

So, with this method, even without the Async CTP, we can use existing .NET 4 support to handle this relatively easily:

Task.Factory.ContinueWhenAll(
(from url in urls select ValidateUrlAsync(url)).ToArray(),
completedTasks => { /* do some end task */ });

Now, using just what comes with .NET 4 proper I get the best of all worlds.

Big thanks to Stephen Toub. There's lots of new high- and low-level constructs for Task creation, Threading, and Parallelism in .NET 4. While the naive solution is often the right one, the components we have to work with in .NET 4 (and the even newer ones in the Visual Studio 2010 Async CTP adding the 'await' and 'async' keywords) will give you surprisingly fine-grained control over your multi-threaded parallel systems without a whole lot of code.

Related Links:

About Scott

Scott Hanselman is a former professor, former Chief Architect in finance, now speaker, consultant, father, diabetic, and Microsoft employee. He is a failed stand-up comic, a cornrower, and a book author.

facebook twitter subscribe
About   Newsletter
Sponsored By
Hosting By
Dedicated Windows Server Hosting by ORCS Web
Monday, November 15, 2010 10:06:48 AM UTC
I think this can be done in Rx fashion as well. Would be cool if someone published an Rx solution.
Vadim Kantorov
Monday, November 15, 2010 12:05:33 PM UTC
Shoot. I see some spelling issues and a code bug where I return null. I'll fix it first thing.
Scott Hanselman
Monday, November 15, 2010 1:05:59 PM UTC
"but you can tack on .WithDegreeOfParallelism(N) after the AsParallel() and provide your own N value."

Not entirely true. I had this one bite me. Look at the documentation:
"Sets the degree of parallelism to use in a query. Degree of parallelism is the maximum number of concurrently executing tasks that will be used to process the query."

Basically, you can't use that to force additional threads, which is particularly useful for a high number of blocking calls that are slow and not processor intensive (like reading from Azure tables).
Monday, November 15, 2010 2:19:25 PM UTC
FWIW, you can just use Task.FromAsync instead of writing all that TaskCompletionSource plumbing yourself.
Monday, November 15, 2010 5:09:22 PM UTC
"Now, usng"? :)
Ann Onymouse
Monday, November 15, 2010 5:41:17 PM UTC
Scott- Thanks for sharing.

Jason- You're right that PLINQ doesn't force N threads to be injected into the ThreadPool, but rather uses as many threads up to N as the ThreadPool will provide to it (by default, the ThreadPool will quickly ramp up to Environment.ProcessorCount, and beyond that it will ramp up as it sees more being beneficial), i.e. it schedules N tasks, and runs those tasks on as many threads concurrently as the TP can muster. This was a conscious design choice to avoid oversubscribing the system and to avoid having competing heuristics fighting with each other: PLINQ relies on the ThreadPool to do the right thing over time. If you really want the TP to inject that many threads proactively, you can use ThreadPool.SetMinThreads to do so prior to running the PLINQ query; then instead of quickly ramping up to Environment.ProcessorCount, it'll quick ramp up to whatever higher number you set as the worker thread pool count.

Drew- Yes, it's also possible to use FromAsync here. If we can assume that the Begin* methods won't throw exceptions, and if we wanted to change the semantics slightly from the previous implementation so that instead of faulting the returned task it would always complete successfully with either true or false as the result, we could do something like the following (the previous comments on assumptions just serve to simplify this code as much as possible):

public static Task<bool> ValidateUrlAsync(string url)
{
var request = (HttpWebRequest)WebRequest.Create(url);
return Task<WebResponse>.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null)
.ContinueWith(t => t.Exception == null ? ((HttpWebResponse)t.Result).StatusCode == HttpStatusCode.OK : false);
}

Thanks.
Stephen Toub
Monday, November 15, 2010 9:22:16 PM UTC
Scott - the patterns & practices team hosts the content of Ade, Stephen and co's book at Parallel Programming with Microsoft .NET - you may want to update that in your links.

Cheers
Richard
Tuesday, November 16, 2010 4:02:07 AM UTC
This is eerily timely. I'm right in the middle of figuring out how to add parallel web page requests to a RIA service. I would have used Parallel.ForEach and thought myself done. But here's an even better approach. Thanks so much!
Tuesday, November 16, 2010 5:19:44 AM UTC
@Vadim - Done! Here's a post about how to do the same kinds of things that Scott describes using Rx.NET:

http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/

Thursday, November 18, 2010 3:16:21 AM UTC
I tried implementing the last approach, using TaskCompletionSource and ContinueWhenAll. At first I was surprised that it released control and ran the continuation action (/* do some end task */) before all of the returned tasks had completed. But as I re-read it, I realized that all that had completed was the BeginGetRequest calls. Was this the intention? If so, then something still needs to WaitAll on the returned Task array, right?
Thursday, November 18, 2010 3:46:22 PM UTC
After more debugging I realized that even though ContinueWhenAll returns control after all of the BeginGetRequest calls have started, the continuation action doesn't execute until after all of the Tasks have completed. So you need to either have your own ResetEvent that gets set in your continuation action before processing the Task array, or process the Task array asyncronously from the function that called ValidateUrlsAsync.
Here is an example using the ResetEvent approach:


Task<bool>[] theTasks = null;
ManualResetEvent[] syncEvent = new ManualResetEvent[] { new ManualResetEvent(false) };
Action<Task<bool>[]> processResults = (urlTasks) =>
{
theTasks = urlTasks;
syncEvent[0].Set();
};
this.pageRetriever.ValidateUrlsAsync(urls, processResults);
WaitHandle.WaitAll(syncEvent);
// process theTasks


In my case I'm processing a RIA service call so I need it to block.
Comments are closed.

Disclaimer: The opinions expressed herein are my own personal opinions and do not represent my employer's view in any way.