Good approach for hundreds of comsumers and big files [closed]
Asked Answered
C

2

9

I have several files (nearly 1GB each) with data. Data is a string line.

I need to process each of these files with several hundreds of consumers. Each of these consumers does some processing that differs from others. Consumers do not write anywhere concurrently. They only need input string. After processing they update their local buffers. Consumers can easily be executed in parallel.

Important: With one specific file each consumer has to process all lines (without skipping) in correct order (as they appear in file). The order of processing different files doesn't matter.

Processing of a single line by one consumer is comparably fast. I expect less than 50 microseconds on Corei5.

So now I'm looking for the good approach to this problem. This is going to be be a part of a .NET project, so please let's stick with .NET only (C# is preferable).

I know about TPL and DataFlow. I guess that the most relevant would be BroadcastBlock. But i think that the problem here is that with each line I'll have to wait for all consumers to finish in order to post the new one. I guess that it would be not very efficient.

I think that ideally situation would be something like this:

  1. One thread reads from file and writes to the buffer.
  2. Each consumer, when it is ready, reads the line from the buffer concurrently and processes it.
  3. The entry from the buffer shouldn't be deleted as one consumer reads it. It can be deleted only when all consumers have processed it.
  4. TPL schedules consumer threads itself.
  5. If one consumer outperforms the others, it shouldn't wait and can read more recent entries from the buffer.

Am i right with this kind of approach? Whether yes or not, how can i implement the good solution?

Cecilacecile answered 30/5, 2014 at 15:14 Comment(7)
What version of .NET? And will one of the consumer tend to always be the slowest. Based on what is in the line will the time by the consumer vary? Does a consumer need to know when it is the last of the file?Sheridansherie
Any version that can help, let's say 4.5. No it won't. On the average all consumers will process one line in nearly the same time. But sometimes (rare) one can take a bit longer. It depends on the line and consumer parameters. No, consumer doesn't need to know. I need to know that on the "higher level". When the file finishes, i call some finishing procedures and then start everything again with a new file.Cecilacecile
Start implementing this and ask a question when you run into a specific problem. Whether the suggested approach is viable depends on the rest of your code. If you want to discuss code you have not yet written, try programmers.stackexchange.comTamis
See may answer. Know at a higher level when finishes. That depend on what finishes. With my answer you know when the producer finishes but not the when the consumers finish. You would need to a to use a string line bool marker for the BC to know in the consumer.Sheridansherie
@Blam SO is not for discussion. I can point out other flaws in the approach as well as yours (the thing you disagree with doesn't have to be a problem and can introduce issues in itself), but that is not what SO is for.Tamis
@Tamis Agree this is a better fit on programmers. I did not know about that site. But I don't need to see code to have have an opinion on if this approach is viable.Sheridansherie
I'll copy the question to programmersCecilacecile
S
1

I don't agree with one thread reads from the files and writes to the buffer
At several files of 1 GB that thread would consume too much memory
.NET has an object size limit and a collection is one object

You are going to need to throttle reading lines
I think you could do that with a BlockingCollection The 1000000 of the bc deals with keeping the slowest consumer busy
And it also give some buffer for opening the next file

using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace BlockingCollection2
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
        }
        public static void BC_GetConsumingEnumerableCollection()
        {
            List<string> fileNames = new List<string>();  // add filesNames
            string producerLine;
            System.IO.StreamReader file;
            List<BCtaskBC> bcs = new List<BCtaskBC>();  // add for each consumer
            // Kick off a producer task
            Task.Factory.StartNew(() =>
            {
                foreach(string fileName in fileNames)
                {
                    file = new System.IO.StreamReader(fileName);
                    while ((producerLine = file.ReadLine()) != null)
                    {
                        foreach (BCtaskBC bc in bcs)
                        {
                            // string is reference type but it often acts like a value type
                            // may need to make a deep copy of producerLine for this next line
                            bc.BC.Add(producerLine);  // if  any queue size gets to 1000000 then this blocks
                        }
                    }
                    file.Close();
                }                 
                // Need to do this to keep foreach below from hanging
                foreach (BCtaskBC bc in bcs)
                {
                    bc.BC.CompleteAdding();
                }
            });

            // Now consume the blocking collection with foreach. 
            // Use bc.GetConsumingEnumerable() instead of just bc because the 
            // former will block waiting for completion and the latter will 
            // simply take a snapshot of the current state of the underlying collection. 
            //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
            Parallel.ForEach(bcs, bc =>
            {
                foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
                {
                    bc.BCtask.ProcessTask(consumerLine);  
                }
            } //close lambda expression
                 ); //close method invocation 
            // I think this need to be parallel
            //foreach (BCtaskBC bc in bcs)
            //{
            //    foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
            //    {
            //        bc.BCtask.ProcessTask(consumerLine);
            //    }
            //}
        }
        public abstract class BCtaskBC
        {   // may need to do something to make this thread safe   
            private BlockingCollection<string> bc = new BlockingCollection<string>(1000000);  // this trotttles the size
            public BCtask BCtask { get; set; }
            public BlockingCollection<string> BC { get { return bc; } }
        }
        public abstract class BCtask
        {   // may need to do something to make this thread safe 
            public void ProcessTask(string S) {}
        }
    }
}
Sheridansherie answered 30/5, 2014 at 17:2 Comment(7)
It's not really true that a collection is a single object. A List<String>, for example, can hold much more than 2 gigabytes. The list itself is just a bunch of references. At 8 bytes each (in 64 bit mode), the collection can hold close to 250,000,000 lines. Each text line is an individual object that can be up to 2 gigabytes long. (Assuming you don't use the gcAllowVeryLargeObjects flag.)Fishman
@JimMischel That is not my experience. I have run out of memory with a large list of objects. And when I made the object smaller I was able to have more in the list. Even if the collection would hold an unlimited number of lines I would still use this approach.Sheridansherie
If he only keeps one line per consumer in memory per read, he shouldn't run out of memory. Unless these lines are incredibly long. I don't see any reason to read all the files into memory at once or store all the consumer data in memory at one.Pyre
@Pyre Is that comment for me? Did you read my answer? I don't read all the lines into memory. And I don't agree with one line in memory at a time. At a minimum I would want to read the next while processing the current. How about the time it takes to open the next file? Would you have it sit while it open the next file?Sheridansherie
A List<T>, if T is a value type, will be subject to the 2 GB size limitation. That is, the maximum number you can have will be 2 GB/sizeof(T). If T is a reference type, then the maximum number of items you can have in the list will be approx. 250 million. Or, if the total size of the allocated objects exceeds the available memory. But you can easily have a List<string> in which the aggregate size of the strings exceeds 2 gigabytes.Fishman
Do i understand correctly that i have a string buffer for each consumer? And each line is added to all of these buffers? Wouldn't there be a performance issue with it?Cecilacecile
Did you try? It is writing on a separate thread. You really think that writing to the buffer is going to be the rate determining step?Sheridansherie
W
0

I have solved similar problem recently. But my solution was not in C#, it was in SQL due to high durability requirements I had. But maybe some of my thoughts will help you (this is how I would do it):

I used “Unit of Work” paradigm. In your case you may select a unit of work as e.g. 100-1000 lines of text. In your case each unit of work may be characterized by file name, start file position and end file position. Each unit also has a flag that tells whether it was processed by a particular consumer. My units of work were stored as DB records; you can save them as objects in a simple memory structure, like list.

After your application starts, a separate thread is kicked off that reads all the files in order and adds units of work to the List. This thread has a list of files to process, it sequentially reads a particular number of lines, notes file positions, and saves file name and file positions to the list.

As long as some units of work are available in the list for processing, the consumers start processing the units starting at the beginning of the list. To get specific lines of text for a particular unit, the consumers use a cache object. As long as all consumers start processing from the beginning of the list, there is a great chance that all consumers ask for the same cached unit of work, at least at the beginning.

The cache object is absolutely independent from the thread that adds units of work to the list. The exact implementation of this object depends on some additional requirements, like what to do if one of the consumers crashed or hung, or what to do if the application restarts, or do you agree that “fast” consumers wait for “slow” consumers, how you would like to monitor the entire process, etc…

Hope this helps…

Whistle answered 30/5, 2014 at 17:43 Comment(8)
But this calculation does not need durable line data. A consumer process a line in 50 microseconds. The overhead writing to and reading from SQL is significant relative to the processing time.Sheridansherie
In my post I offered saving units of work in memory instead of a database, I also offered a unit of work to contain 100-1000 lines of text. This improves performance by reducing context switching. You also can select desired degree of parellelism by assigning several consumers to same thread.Whistle
Yes you can save them as objects in a simple memory structure, like list. But a simple list does not handle concurrency like a database does. Neither in terms of concurrent reads and writes nor in terms of concurrent reads from multiple threads.Sheridansherie
You can use simple ReaderWriterLockSlim because one thread writes to it, and multiple consumer threads read from it.Whistle
Writes and reads need a lock. That is a whole lot of locking going on. List.Add is O(n) and List[i] is O(n). So a unit of work if could contain 1000 to reduce context switching. Getting the next 1000 from a database is scalable and getting the next 1000 from a List is not. Adding 1000 to million in a database is scalable. Adding 1000 to a million in List is not. I get units of work but I politely suggest it does is does not translate to memory objects.Sheridansherie
Really? Both Lis.Add and List[i] have O(1) performance. See e.g. here.Whistle
I don't need to read next 1000, On the contrary, I need to read one unit of work to get 1000 lines, this boosts performance.Whistle
msdn.microsoft.com/en-us/library/3wcytfd1(v=vs.110).aspx If Count is less than Capacity, this method is an O(1) operation. If the capacity needs to be increased to accommodate the new element, this method becomes an O(n) operation, where n is Count.Sheridansherie

© 2022 - 2024 — McMap. All rights reserved.