For a TPL Dataflow: How do I get my hands on all the output produced by a TransformBlock while blocking until all inputs have been processed?
Asked Answered
C

3

1

I'm submitting a series of select statements (queries - thousands of them) to a single database synchronously and getting back one DataTable per query (Note: This program is such that it has knowledge of the DB schema it is scanning only at run time, hence the use of DataTables). The program runs on a client machine and connects to DBs on a remote machine. It takes a long time to run so many queries. So, assuming that executing them async or in parallel will speed things up, I'm exploring TPL Dataflow (TDF). I want to use the TDF library because it seems to handle all of the concerns related to writing multi-threaded code that would otherwise need to be done by hand.

The code shown is based on http://blog.i3arnon.com/2016/05/23/tpl-dataflow/. Its minimal and is just to help me understand the basic operations of TDF. Please do know I've read many blogs and coded many iterations trying to crack this nut.

None-the-less, with this current iteration, I have one problem and a question:

Problem

The code is inside a button click method (Using a UI, a user selects a machine, a sql instance, and a database, and then kicks off the scan). The two lines with the await operator return an error at build time: The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'. I can't change the return type of the button click method. Do I need to somehow isolate the button click method from the async-await code?

Question

Although I've found beau-coup write-ups describing the basics of TDF, I can't find an example of how to get my hands on the output that each invocation of the TransformBlock produces (i.e., a DataTable). Although I want to submit the queries async, I do need to block until all queries submitted to the TransformBlock are completed. How do I get my hands on the series of DataTables produced by the TransformBlock and block until all queries are complete?

Note: I acknowledge that I have only one block now. At a minimum, I'll be adding a cancellation block and so do need/want to use TPL.

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{

    UserInput userInput = new UserInput
    {
        MachineName = "gat-admin",
        InstanceName = "",
        DbName = "AdventureWorks2014",
    };

    DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);

    //CreateTableQueryList gets a list of all tables from the DB and returns a list of 
    // select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
    IList<String> tableQueryList = CreateTableQueryList(userInput);

    // Define a block that accepts a select statement and returns a DataTable of results
    // where each returned record is: schemaname + tablename + columnname + column datatype + field data
    // e.g., if the select query returns one record with 5 columns, then a datatable with 5 
    // records (one per field) will come back 

    var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
        async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 2,
        });

    // Add items to the block and start processing
    foreach (String tableQuery in tableQueryList)
    {
        await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
    }

    // Enable the Cancel button and disable the Start button.
    toolStripButtonStart.Enabled = false;
    toolStripButtonStop.Enabled = true;

    //shut down the block (no more inputs or outputs)
    transformBlock_SubmitTableQuery.Complete();

    //await the completion of the task that procduces the output DataTable
    await transformBlock_SubmitTableQuery.Completion;
}

public async Task<DataTable> _SubmitSelectStatement(string queryString )
{
    try
    {

        .
        .
        await Task.Run(() => sqlDataAdapter.Fill(dt));

        // process dt into the output DataTable I need

        return outputDt;
    }
    catch
    {
        throw;
    }

}
Complacent answered 20/3, 2018 at 16:18 Comment(4)
Bad assumption. If the query is slow fix it. Running more slow queries on the same server with the same CPU and the same disk over the same network will only slow things. Loading the results into a DataTable will add even more delays.Glede
Queries that load everything into the client for processing result in far worse delays. The client has less memory, less CPU, less disk IO than the server and no indexes to speed things up. Loading everything into the client for processing is a bad idea.Glede
BTW if you try to process data in order to eg generate reports or fill a reporting schema, create a proper reporting database or data warehouse and fill it using ETL tools like SSIS. Update only the rows that have changed. The query performance will be many orders of magnitude better than processing on the client. Working only with the changes, will be orders of magnitude faster than pulling everything too.Glede
@PanagiotisKanavos: All good and valid points. However, it is what it is. It is a requirement that using this program involve no changes to the (production) machine whose database is being scanned. I just need to make running the queries be as fast as possible, not necessarily blazing fastComplacent
F
5

The cleanest way to retrieve the output of a TransformBlock is to perform a nested loop using the methods OutputAvailableAsync and TryReceive. It is a bit verbose, so you could consider encapsulating this functionality in an extension method ToListAsync:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    List<T> list = new();
    while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (source.TryReceive(out T item))
        {
            list.Add(item);
        }
    }
    Debug.Assert(source.Completion.IsCompleted);
    await source.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

Then you could use the ToListAsync method like this:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
    TransformBlock<string, DataTable> transformBlock = new(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    {
        // Do something with each dataTable
    }
}

Note: this ToListAsync implementation is destructive, meaning that in case of an error the consumed messages are discarded. To make it non-destructive, just remove the await source.Completion line. In this case you'll have to remember to await the Completion of the block after processing the list with the consumed messages, otherwise you won't be aware if the TransformBlock failed to process all of its input.

Alternative ways to retrieve the output of a dataflow block do exist, for example this one by dcastro uses a BufferBlock as a buffer and is slightly more performant, but personally I find the approach above to be safer and more straightforward.

Instead of waiting for the completion of the block before retrieving the output, you could also retrieve it in a streaming manner, as an IAsyncEnumerable<T> sequence:

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> source,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (source.TryReceive(out T item))
        {
            yield return item;
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    Debug.Assert(source.Completion.IsCompleted);
    await source.Completion.ConfigureAwait(false); // Propagate possible exception
}

This way you will be able to get your hands to each DataTable immediately after it has been cooked, without having to wait for the processing of all queries. To consume an IAsyncEnumerable<T> you simply move the await before the foreach:

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
    // Do something with each dataTable
}

Advanced: Below is a more sophisticated version of the ToListAsync method, that propagates all the errors of the underlying block, in the same direct way that are propagated by methods like the Task.WhenAll and Parallel.ForEachAsync. The original simple ToListAsync method wraps the errors in a nested AggregateException, using the Wait technique that is shown in this answer.

/// <summary>
/// Asynchronously waits for the successful completion of the specified source, and
/// returns all the received messages. In case the source completes with error,
/// the error is propagated and the received messages are discarded.
/// </summary>
public static Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);

    async Task<List<T>> Implementation()
    {
        List<T> list = new();
        while (await source.OutputAvailableAsync(cancellationToken)
            .ConfigureAwait(false))
            while (source.TryReceive(out T item))
                list.Add(item);
        await source.Completion.ConfigureAwait(false);
        return list;
    }

    return Implementation().ContinueWith(t =>
    {
        if (t.IsCanceled) return t;
        Debug.Assert(source.Completion.IsCompleted);
        if (source.Completion.IsFaulted)
        {
            TaskCompletionSource<List<T>> tcs = new();
            tcs.SetException(source.Completion.Exception.InnerExceptions);
            return tcs.Task;
        }
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

.NET 6 update: A new API DataflowBlock.ReceiveAllAsync was introduced in .NET 6, with this signature:

public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
    this IReceivableSourceBlock<TOutput> source,
    CancellationToken cancellationToken = default);

It is similar with the aforementioned ToAsyncEnumerable method. The important difference is that the new API does not propagate the possible exception of the consumed source block, after propagating all of its messages. This behavior is not consistent with the analogous API ReadAllAsync from the Channels library. I have reported this consistency on GitHub, and the issue is currently labeled by Microsoft as a bug.

Floorer answered 16/6, 2020 at 13:45 Comment(2)
after a few months of learning about DataFlow, I must know if you and @panagiotis-kanavos ever battled it out IRL in the streets of Athens ??? :)Dimer
@NicholasFranceschina ha ha! No, it hasn't happened. Athens is a big city. But if we ever meet in person, it might be awkward. We hardly agree in anything.Floorer
C
1

As it turns out, to meet my requirements, TPL Dataflow is a bit overkill. I was able to meet my requirements using async/await and Task.WhenAll. I used the Microsoft How-To How to: Extend the async Walkthrough by Using Task.WhenAll (C#) as a model.

Regarding my "Problem"

My "problem" is not a problem. An event method signature (in my case, a "Start" button click method that initiates my search) can be modified to be async. In the Microsoft How-To GetURLContentsAsync solution, see the startButton_Click method signature:

private async void startButton_Click(object sender, RoutedEventArgs e)  
{  
    .
    .
}  

Regarding my question

Using Task.WhenAll, I can wait for all my queries to finish then process all the outputs for use on my UI. In the Microsoft How-To GetURLContentsAsync solution, see the SumPageSizesAsync method, i.e,, the array of int named lengths is the sum of all outputs.

private async Task SumPageSizesAsync()  
{  
    .
    .
    // Create a query.   
    IEnumerable<Task<int>> downloadTasksQuery = from url in urlList select ProcessURLAsync(url);  

    // Use ToArray to execute the query and start the download tasks.  
    Task<int>[] downloadTasks = downloadTasksQuery.ToArray();  

    // Await the completion of all the running tasks.  
    Task<int[]> whenAllTask = Task.WhenAll(downloadTasks);  

    int[] lengths = await whenAllTask;  
    .
    .
}    
Complacent answered 25/3, 2018 at 13:24 Comment(3)
That's a very bad solution. Besides, all you need to do is write await Task.WhenAll(downloadTasksQuery);. This will execute all downloads concurrently, but then cause blocking while saving to the database. Actually, datalfow is a good solution in this case, making the code simpler. You can use a single TransformBlock to download results with a DOP>1 (ie more than 1 worker task) and then use a linked ActionBlock to write them to the database. You can add a BatchBlock between them to batch results so you can use SqlBulkCopy to instert data as fast as possible, with minimal loggingGlede
Using a DbDataAdapter adds an extra needless step too. If you have a list or array of objects you can use FastMember's ObjectReader to create an IDbDataReader wrapper over it and pass that to SqlBulkCopy.WriteToServerGlede
I added an answer that shows what a dataflow pipeline could look likeGlede
G
0

Using Dataflow blocks properly results in both cleaner and faster code. Dataflow blocks aren't agents or tasks. They're meant to work in a pipeline of blocks, connected with LinkTo calls, not manual coding.

It seems the scenario is to download some data, eg some CSVs, parse them and insert them to a database. Each of those steps can go into its own block:

  • a Downloader with a DOP>1, to allow multiple downloads run concurrently without flooding the network.
  • a Parser that converts the files into arrays of objects
  • an Importer that uses SqlBulkCopy to bulk insert the rows into the database in the fastest way possible, using minimal logging.
var downloadDOP=8;
var parseDOP=2;
var tableName="SomeTable";

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};

var downloadOptions =new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = downloadDOP,
};

var parseOptions =new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = parseDOP,
};

With these options, we can construct a pipeline of blocks

//HttpClient is thread-safe and reusable
HttpClient http=new HttpClient(...);

var downloader=new TransformBlock<(Uri,string),FileInfo>(async (uri,path)=>{
    var file=new FileInfo(path);
    using var stream =await httpClient.GetStreamAsync(uri);
    using var fileStream=file.Create();
    await stream.CopyToAsync(stream);
    return file;
},downloadOptions);

var parser=new TransformBlock<FileInfo,Foo[]>(async file=>{
    using var reader = file.OpenText();
    using var csv = new CsvReader(reader, CultureInfo.InvariantCulture);
    var records = csv.GetRecords<Foo>().ToList();
    return records;
},parseOptions);

var importer=new ActionBlock<Foo[]>(async recs=>{
    using var bcp=new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock);
    bcp.DestinationTableName=tableName;

    //Map columns if needed
    ...
    using var reader=ObjectReader.Create(recs);
    await bcp.WriteToServerAsync(reader);
});

downloader.LinkTo(parser,linkOptions);
parser.LinkTo(importer,linkOptions);

Once the pipeline is complete, you can start posting Uris to the head block and await until the tail block completes :

IEnumerable<(Uri,string)> filesToDownload = ...


foreach(var pair in filesToDownload)
{
    await downloader.SendAsync(pair);
}

downloader.Complete();

await importer.Completion;

The code uses CsvHelper to parse the CSV file and FastMember's ObjectReader to create an IDataReader wrapper over the CSV records.

In each block you can use a Progress instance to update the UI based on the pipeline's progress

Glede answered 29/6, 2022 at 7:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.