How to mark a TPL dataflow cycle to complete?
Asked Answered
G

5

5

Given the following setup in TPL dataflow.

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    }));

tileFilder.LinkTo(block);


using (new TraceTimer("Time"))
{
    dirBroadcast.Post(directory);

    block.LinkTo(new ActionBlock<FileInfo>((s) =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);

    }));
    block.Complete();
    block.Completion.Wait();

}

i am wondering how I can mark this to complete because of the cycle. A directory is posted to the dirBroadcast broadcaster which posts to the dirfinder that might post back new dirs to the broadcaster, so i cant simply mark it as complete because it would block any directories being added from the dirfinder. Should i redesign it to keep track of the number of dirs or is there anything for this in TPL.

Galenical answered 30/9, 2014 at 21:10 Comment(2)
What does complete mean for you? which link do you want to stop and which keep running?Cle
when tileFinder is done i want to mark the block.complete() and await its completion. But i cant mark tileFinder.complete() before dirbroadcast is completed and dirboardcost is in a cycle with itself so cant mark it as complete.Hydrosphere
C
4

If the purpose of your code is to traverse the directory structure using some sort of parallelism then I would suggest not using TPL Dataflow and use Microsoft's Reactive Framework instead. I think it becomes much simpler.

Here's how I would do it.

First define a recursive function to build the list of directories:

Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
    Observable
        .Return(di)
        .Concat(di.GetDirectories()
            .ToObservable()
            .SelectMany(di2 => recurse(di2)))
        .ObserveOn(Scheduler.Default);

This performs the recurse of the directories and uses the default Rx scheduler which causes the observable to run in parallel.

So by calling recurse with an input DirectoryInfo I get an observable list of the input directory and all of its descendants.

Now I can build a fairly straight-forward query to get the results I want:

var query =
    from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
    from fi in di.GetFiles().ToObservable()
    let zxy =
        fi
            .FullName
            .Split('\\')
            .Reverse()
            .Take(3)
            .Reverse()
            .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
            .ToArray()
    let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
    select new FileInfo(Path.Combine(di.FullName, suffix));

Now I can action the query like this:

query
    .Subscribe(s =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);
    });

Now I may have missed a little bit in your custom code but if this is an approach you want to take I'm sure you can fix any logical issues quite easily.

This code automatically handles completion when it runs out of child directories and files.

To add Rx to your project look for "Rx-Main" in NuGet.

Caa answered 1/10, 2014 at 0:3 Comment(2)
Cool. Thanks. One of the things that is nice about TPL is that i can controll the parallelism properties of individual parts. Is something similar avaible in Rx. (more general and not this specific problem, for finding files some of the other exampels look more simple)Hydrosphere
@pksorensen - You generally don't need (or want) to control the parallelism in Rx. It does a good job doing that itself. However, they have written it to use a variety of schedulers that provide all sorts of scheduling options and, best of all, they're based on an IScheduler interface so if you don't like the built-in options you can roll your own for ultimate control.Caa
I
3

I am sure this is not always possible, but in many cases (including directory enumeration) you can use a running counter and the Interlocked functions to have a cyclic one-to-many dataflow that completes:

public static ISourceBlock<string> GetDirectoryEnumeratorBlock(string path, int maxParallel = 5)
{
    var outputBuffer = new BufferBlock<string>();

    var count = 1;

    var broadcastBlock = new BroadcastBlock<string>(s => s);

    var getDirectoriesBlock = new TransformManyBlock<string, string>(d =>
    {
        var files = Directory.EnumerateDirectories(d).ToList();

        Interlocked.Add(ref count, files.Count - 1); //Adds the subdir count, minus 1 for the current directory.

        if (count == 0) //if count reaches 0 then all directories have been enumerated.
            broadcastBlock.Complete();

        return files;

    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });

    broadcastBlock.LinkTo(outputBuffer, new DataflowLinkOptions() { PropagateCompletion = true });
    broadcastBlock.LinkTo(getDirectoriesBlock, new DataflowLinkOptions() { PropagateCompletion = true });

    getDirectoriesBlock.LinkTo(broadcastBlock);

    getDirectoriesBlock.Post(path);

    return outputBuffer;
}

I have used this with a slight modification to enumerate files, but it works well. Be careful with the max degree of parallelism, this can quickly saturate a network file system!

Interpellation answered 2/10, 2015 at 18:40 Comment(0)
C
2

I don't see any way this can be done, because each block (dirBroadcast and tileFilder) depends on the other one and can't complete on its own.

I suggest you redesign your directory traversal without TPL Dataflow, which isn't a good fit for this kind of problem. A better approach in my opinion would simply be to recursively scan the directories and fill your block with a stream of files:

private static void FillBlock(DirectoryInfo directoryInfo, XYZTileCombinerBlock<FileInfo> block)
{
    foreach (var fileInfo in directoryInfo.GetFiles())
    {
        block.Post(fileInfo);
    }

    foreach (var subDirectory in directoryInfo.GetDirectories())
    {
        FillBlock(subDirectory, block);
    }
}

FillBlock(directory, block);
block.Complete();
await block.Completion;
Cle answered 30/9, 2014 at 21:15 Comment(0)
A
1

Here is a generalized approach of Andrew Hanlon's solution. It returns a TransformBlock that supports posting messages recursively to itself, and completes automatically when there are no more messages to process.

The transform lambda has three arguments instead of the usual one. The first argument is the item being processed. The second argument is the "path" of the processed message, which is a sequence IEnumerable<TInput> containing its parent messages. The third argument is an Action<TInput> that posts new messages to the block, as children of the current message.

/// <summary>Creates a dataflow block that supports posting messages to itself,
/// and knows when it has completed processing all messages.</summary>
public static IPropagatorBlock<TInput, TOutput>
    CreateRecursiveTransformBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TInput>, Action<TInput>, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    int pendingCount = 1; // The initial 1 represents the completion of input1 block
    var input1 = new TransformBlock<TInput, (TInput, IEnumerable<TInput>)>(item =>
    {
        Interlocked.Increment(ref pendingCount);
        return (item, Enumerable.Empty<TInput>());
    }, new ExecutionDataflowBlockOptions()
    {
        CancellationToken = dataflowBlockOptions.CancellationToken,
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity
    });

    var input2 = new BufferBlock<(TInput, IEnumerable<TInput>)>(new DataflowBlockOptions()
    {
        CancellationToken = dataflowBlockOptions.CancellationToken
        // Unbounded capacity
    });

    var output = new TransformBlock<(TInput, IEnumerable<TInput>), TOutput>(async entry =>
    {
        try
        {
            var (item, path) = entry;
            var postChildAction = CreatePostAction(item, path);
            return await transform(item, path, postChildAction).ConfigureAwait(false);
        }
        finally
        {
            if (Interlocked.Decrement(ref pendingCount) == 0) input2.Complete();
        }
    }, dataflowBlockOptions);

    Action<TInput> CreatePostAction(TInput parentItem, IEnumerable<TInput> parentPath)
    {
        return item =>
        {
            // The Post will be unsuccessful only in case of block failure
            // or cancellation, so no specific action is needed here.
            if (input2.Post((item, parentPath.Append(parentItem))))
            {
                Interlocked.Increment(ref pendingCount);
            }
        };
    }

    input1.LinkTo(output);
    input2.LinkTo(output);

    PropagateCompletion(input1, input2,
        condition: () => Interlocked.Decrement(ref pendingCount) == 0);
    PropagateCompletion(input2, output);
    PropagateFailure(output, input1, input2); // Ensure that all blocks are faulted

    return DataflowBlock.Encapsulate(input1, output);

    async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
        Func<bool> condition = null)
    {
        try
        {
            await block1.Completion.ConfigureAwait(false);
        }
        catch { }

        if (block1.Completion.Exception != null)
        {
            block2.Fault(block1.Completion.Exception.InnerException);
        }
        else
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            if (condition == null || condition()) block2.Complete();
        }
    }

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2,
        IDataflowBlock block3)
    {
        try
        {
            await block1.Completion.ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            block2.Fault(ex); block3.Fault(ex);
        }
    }
}

// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
    CreateRecursiveTransformBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TInput>, Action<TInput>, TOutput> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateRecursiveTransformBlock<TInput, TOutput>((item, path, postAction) =>
        Task.FromResult(transform(item, path, postAction)), dataflowBlockOptions);
}

The resulting block consists internally of three blocks: two input blocks that receive messages, and one output block that processes the messages. The first input block receives messages from outside, and the second input block receives messages from inside. The second input block has unbounded capacity, so an infinite recursion will eventually result to an OutOfMemoryException.

Usage example:

var fileCounter = CreateRecursiveTransformBlock<string, int>(
    (folderPath, parentPaths, postChild) =>
{
    var subfolders = Directory.EnumerateDirectories(folderPath);
    foreach (var subfolder in subfolders) postChild(subfolder);
    var files = Directory.EnumerateFiles(folderPath);
    Console.WriteLine($"{folderPath} has {files.Count()} files"
        + $", and is {parentPaths.Count()} levels deep");
    return files.Count();
});
fileCounter.LinkTo(DataflowBlock.NullTarget<int>());
fileCounter.Post(Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments));
fileCounter.Complete();
fileCounter.Completion.Wait();

The above code prints in the console all the subfolders of the folder "MyDocuments".

Asben answered 10/6, 2020 at 6:33 Comment(0)
G
0

Just to show my real answer, a combination of TPL and Rx.

            Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
            recurse = di =>
                Observable
                    .Return(di)
                    .Concat(di.GetDirectories()
                        .Where(d => int.Parse(d.Name) <= br_tile[0] && int.Parse(d.Name) >= tl_tile[0])
                        .ToObservable()
                        .SelectMany(di2 => recurse(di2)))
                    .ObserveOn(Scheduler.Default);
            var query =
                from di in recurse(new DirectoryInfo(Path.Combine(directory.FullName, baselvl.ToString())))
                from fi in di.GetFiles().Where(f => int.Parse(Path.GetFileNameWithoutExtension(f.Name)) >= br_tile[1]
                    && int.Parse(Path.GetFileNameWithoutExtension(f.Name)) <= tl_tile[1]).ToObservable()
                select fi;
            query.Subscribe(block.AsObserver());
            Console.WriteLine("Done subscribing");
            block.Complete();

            block.Completion.Wait();
            Console.WriteLine("Done TPL Block");

where block is my var block = new XYZTileCombinerBlock<FileInfo>

Galenical answered 2/10, 2014 at 20:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.