TThreadedQueue not capable of multiple consumers?
Asked Answered
M

5

44

Trying to use the TThreadedQueue (Generics.Collections) in a single producer multiple consumer scheme. (Delphi-XE). The idea is to push objects into a queue and let several worker threads draining the queue.

It does not work as expected, though. When two or more worker threads are calling PopItem, access violations are thrown from the TThreadedQueue.

If the call to PopItem is serialized with a critical section, all is fine.

Surely the TThreadedQueue should be able to handle multiple consumers, so am I missing something or is this a pure bug in TThreadedQueue ?

Here is a simple example to produce the error.

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

Update : The error in TMonitor that caused TThreadedQueue to crash is fixed in Delphi XE2.

Update 2 : The above test stressed the queue in the empty state. Darian Miller found that stressing the queue at full state, still could reproduce the error in XE2. The error once again is in the TMonitor. See his answer below for more information. And also a link to the QC101114.

Update 3 : With Delphi-XE2 update 4 there was an announced fix for TMonitor that would cure the problems in TThreadedQueue. My tests so far are not able to reproduce any errors in TThreadedQueue anymore. Tested single producer/multiple consumer threads when queue is empty and full. Also tested multiple producers/multiple consumers. I varied the reader threads and writer threads from 1 to 100 without any glitch. But knowing the history, I dare others to break TMonitor.

Mime answered 31/1, 2011 at 21:25 Comment(5)
Hi LU RD! Welcome to StackOverflow. This is a good question you've got, but it might be easier to test if the code was posted a little bit differently. You've included the .pas half of a form, without the corresponding DFM, and that makes it harder for us to duplicate and investigate. The problem doesn't seem to be UI-related, so is there any way you could either reduce this to a console app? Thanks.Labored
Mason, console app done.Mime
Problems are still there in XE2...Taxi
XE2 update 4 fixes these issues.Taxi
See a blog post by @DarianMiller, Revisting TThreadedQueue and TMonitor in Delphi, for a recent status of TMonitor and TThreadedQueue.Mime
L
19

Well, it's hard to be sure without a lot of testing, but it certainly looks like this is a bug, either in TThreadedQueue or in TMonitor. Either way it's in the RTL and not your code. You ought to file this as a QC report and use your example above as the "how to reproduce" code.

Labored answered 31/1, 2011 at 23:45 Comment(5)
Mason, thank you. I will QC it tomorrow unless someone else has a different opinion. It seems the error is in the TMonitor.Mime
QC #91246 TThreadedQueue fails with multiple consumers. Vote for it if you like.Mime
LInk to the QCReport: http://qc.embarcadero.com/wc/qcmain.aspx?d=91246Douala
Doesn't seem to be fixed - added a community wiki answer with tweak to example code to generate AV in XE2Taxi
Does anyone know how this bug got fixed, and how the fix might be applied to XE3?Arrest
N
10

I recommend you to use OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary when working with threads, parallelism, etc. Primoz made a very good job, and on the site you'll find a lot of useful documentation.

Numb answered 1/2, 2011 at 8:22 Comment(1)
I know the OmniThreadLibrary well and also AsyncCalls by Andreas Hausladenhttp://andy.jgknet.de/blog/bugfix-units/asynccalls-29-asynchronous-function-calls/.Mime
T
4

Your example seems to work fine under XE2, but if we fill your queue it fails with AV on a PushItem. (Tested under XE2 Update1)

To reproduce, just increase your task creation from 100 to 1100 (your queue depth was set at 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

This dies for me every time on Windows 7. I initially tried a continual push to stress test it, and it failed at loop 30...then at loop 16...then at 65 so at different intervals but it consistently failed at some point.

  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;
Taxi answered 31/1, 2011 at 21:26 Comment(9)
Oh no, at some point I feared this also could be a breaking point, just like when the queue was empty. I even made a comment on this on another post in SO. Silly of me not to test it. I will make some more tests to confirm it.Mime
Yepp, fails consistently on Windows 7 64 bit (XE2 Update 2), both on 32 and 64 bit exe. Will you QC it or shall I do it ?Mime
I stumbled across this link from Gurock that allows for easy conversion of TThreadedQueue that avoids the AV's. blog.gurock.com/software/… Change types from TObject to Gurock versions: FQueueNotEmpty, FQueueNotFull to TCondition and FQueueLock to TLock; (and change calling conventions) I haven't been able to repeat the errors in TThreadedQueue with these changes.Taxi
Chris Rolliston blogs about this bug, fixing-tthreadedqueue-or-in-other-words-tmonitor-again. And he also offers a solution involving some adjustments in TThreadedQueue source code to circumvent the error in TMonitor.Mime
@LURD - I tested the latest changes posted on the blog and the TThreadedQueue still dies. Using the original example code, add a new TThreadWriter that executes the PushItem code above and create multiple writer threads. (Basically pump-up the power even further.) Comment left on Chris Rolliston's blog with parameters used on my test. The good news is that Allen Bauer commented on the same blog post that another fix is on it's way in the next update.Taxi
Darian, good spotted, I replicated this as well. I can't see any comment of your's on Chris blog though. Hopefully the Emb update will handle the multiple producer/multiple consumer case as well.Mime
OK, now I can see your answer in the blog.Mime
I made some tests with XE2 update 4 and can't break TThreadedQueue anymore. Tried many combinations with multiple producers/multiple consumers.Mime
I've tried XE2 update 4 and my stress tests now pass. I let them run for an hour without issue.Taxi
A
3

I looked for the TThreadedQueue class but don't seem to have it in my D2009. I'm not exactly going to kill myself over this - Delphi thread support has always been err.. errm... 'non-optimal' and I suspect that TThreadedQueue is no different :)

Why use generics for P-C (Producer / Consumer) objects? A simple TObjectQueue descendant will do fine - been using this for decades - works fine with multiple producers/consumers:

unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;


TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
   inherited Create;
{$ELSE}
  inherited create;
{$ENDIF}
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;

procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the queue
  finally
    access.release;
  end;
end;
end.
Autism answered 17/5, 2011 at 20:31 Comment(1)
thank's for your answer. I saw the TThreadedQueue class in the documentation for XE and made a simple test for a real application I had. This was my first shot at generics and it did not turn out well. As you can see from other comments, the bug is in the TMonitor class which will have implications if someone builds a parallel multithreaded application. My implementation ended up using a simple queue protected with a critical section for pushing and popping.Mime
S
1

I don't think TThreadedQueue is supposed to support multiple consumers. It's a FIFO, as per the help file. I am under the impression that there's one thread pushing and another one (just one!) popping.

Shutdown answered 1/2, 2011 at 13:27 Comment(4)
FIFO is just a way of saying how the queue is emptied. It doesn't mean that there may only be one thread pulling jobs from the queue. Especially not when it is called ThreadedQueue.Furbish
It's called ThreadedQueue because the pusher and popper can be in different threads. In the multithreading world nothing comes for free, therefore I think the docs would have mentioned multiple producer and/or consumer support if it were available. It isn't mentioned, so I think it's not supposed to work.Shutdown
the queue is protected by a monitor. The monitor itself must be safe in a multithreading environment. If the queue was not safe for multiple consumers, it should at least cast an exception that could be catched.Mime
@LU RD: note the "TMonitor.Pulse(FQueueNotFull);" at the end of TThreadedQueue<T>.PopItem. It's not protected by the monitor you mentioned. I guess that's causing your problem.Shutdown

© 2022 - 2024 — McMap. All rights reserved.