How do I write a thread-aware extension function for PLINQ?
Asked Answered
T

1

8

Does anybody know how to write an extension function returning a ParallelQuery in PLINQ?

More specifically, I have the following problem: I want to perform a transformation within a PLINQ query that needs an engine, whose creation is costly and which cannot be accessed concurrently.

I could do the following:

var result = source.AsParallel ().Select ( (i) => { var e = new Engine (); return e.Process(i); } )

Here, the engine is created once per item, which is too expensive.

I want the engine to be created once per thread.

With Aggregate, I can come close to what I want with something like

// helper class: engine to use plus list of results obtained in thread so far
class EngineAndResults {
   public Engine engine = null;
   public IEnumerable<ResultType> results;
}

var result = source.AsParallel ().Aggregate (

   // done once per block of items (=thread),
   // returning an empty list, but a new engine
   () => new EngineAndList () {
       engine = new Engine (),
       results = Enumerable.Empty<ResultType> ()
   },

   // we process a new item and put it to the thread-local list,
   // preserving the engine for further use
   (engineAndResults, item) => new EngineAndResults () {
       engine = engineAndResults.engine,
       results = Enumerable.Concat (
           engineAndResults.results,
           new ResultType [] { engineAndResults.engine.Process (item) }
       )
   },

   // tell linq how to aggregate across threads
   (engineAndResults1, engineAndResults2) => new EngineAndResults () {
       engine = engineAndResults1.engine,
       results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results)
   },

   // after all aggregations, how do we come to the result?
   engineAndResults => engineAndResults.results
);

As you can see, I misuse the accumulator to carry an engine per thread. The problem here is that PLINQ in the end aggregates the results into a single IEnumerable, which causes the threads to be synchronized. This is not very nice if I want to append another PLINQ extension afterwards.

I would appreciate something like

   var result = source.AsParallel ()
                  .SelectWithThreadwiseInitWhichIAmLookingFor (
                       () => new Engine (),
                       (engine, item) => engine.Process (item)
              )

Does anybody have any idea how to achieve this?

Talishatalisman answered 22/6, 2012 at 13:11 Comment(0)
J
5

You could use ThreadLocal<T> to do this. Something like:

var engine = new ThreadLocal<Engine>(() => new Engine());
var result = source.AsParallel()
                   .Select(item => engine.Value.Process(item));
Janinajanine answered 22/6, 2012 at 13:33 Comment(2)
Thank you. This is a good solution. I did a short test, and it seems to work fine. I tried to find a way to put the initialization into the extension function, but did not succeed -- obviously ThreadLocal must be created before AsParallel is called. I do not see the reason for that, but anyway, this is not a big issue.Talishatalisman
I think that didn't work, because you were creating a new ThreadLocal for each iteration, so there couldn't be any sharing for iterations that execute on the same thread. All of the iterations that run on the same thread need the same instance of ThreadLocal.Janinajanine

© 2022 - 2024 — McMap. All rights reserved.