I want to limit the number of items posted in a Dataflow pipeline. The number of items depends of the production environment. These objects consume a large amount of memory (images) so I would like to post them when the last block of the pipeline has done its job.
I tried to use a SemaphoreSlim
to throttle the producer and release it in the last block of the pipeline. It works, but if an exception is raised during the process, the program waits forever and the exception is not intercepted.
Here is a sample which looks like our code. How can I do this ?
static void Main(string[] args)
{
SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);
var downloadString = new TransformBlock<string, string>(uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return new WebClient().DownloadString(uri);
});
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
char[] tokens = text.ToArray();
for (int i = 0; i < tokens.Length; i++)
{
if (!char.IsLetter(tokens[i]))
tokens[i] = ' ';
}
text = new string(tokens);
return text.Split(new char[] { ' ' },
StringSplitOptions.RemoveEmptyEntries);
});
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
throw new InvalidOperationException("ouch !"); // explicit for test
return words.Where(word => word.Length > 3).OrderBy(word => word)
.Distinct().ToArray();
});
var findPalindromes = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Finding palindromes...");
var palindromes = new ConcurrentQueue<string>();
Parallel.ForEach(words, word =>
{
string reverse = new string(word.Reverse().ToArray());
if (Array.BinarySearch<string>(words, reverse) >= 0 &&
word != reverse)
{
palindromes.Enqueue(word);
}
});
return palindromes.ToArray();
});
var printPalindrome = new ActionBlock<string[]>(palindromes =>
{
try
{
foreach (string palindrome in palindromes)
{
Console.WriteLine("Found palindrome {0}/{1}",
palindrome, new string(palindrome.Reverse().ToArray()));
}
}
finally
{
semaphore.Release();
}
});
downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findPalindromes);
findPalindromes.LinkTo(printPalindrome);
downloadString.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)createWordList).Fault(t.Exception);
else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)filterWordList).Fault(t.Exception);
else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)findPalindromes).Fault(t.Exception);
// enter here when an exception throws
else findPalindromes.Complete();
});
findPalindromes.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)printPalindrome).Fault(t.Exception);
// the fault is propagated here but not caught
else printPalindrome.Complete();
});
try
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
downloadString.Post("http://www.google.com");
semaphore.Wait(); // waits here when an exception throws
}
downloadString.Complete();
printPalindrome.Completion.Wait();
}
catch (AggregateException agg)
{
Console.WriteLine("An error has occured : " + agg);
}
Console.WriteLine("Done");
Console.ReadKey();
}
findPalindromes.Completion.ContinueWith(t => { if (t.IsFaulted) { ((IDataflowBlock)printPalindrome).Fault(t.Exception); ((IDataflowBlock)downloadString).Fault(t.Exception); //mark the first block has faulted } else printPalindrome.Complete(); });
it works. But I am not sure its the better way to do that. – Batholomew