Here's an article from Delphi3000 showing how to use IOCP to create a thread pool. I am not the author of this code, but the author's information is in the source code.
I'm re-posting the comments and code here:
Everyone by now should understand what
a thread is, the principles of threads
and so on. For those in need, the
simple function of a thread is to
separate processing from one thread to
another, to allow concurrent and
parallel execution. The main principle
of threads is just as simple, memory
allocated which is referenced between
threads must be marshalled to ensure
safety of access. There are a number
of other principles but this is really
the one to care about.
And on..
A thread safe queue will allow
multiple threads to add and remove,
push and pop values to and from the
queue safely on a First on First off
basis. With an efficient and well
written queue you can have a highly
useful component in developing
threaded applications, from helping
with thread safe logging, to
asynchronous processing of requests.
A thread pool is simply a thread or a
number of threads which are most
commonly used to manage a queue of
requests. For example a web server
which would have a continuous queue of
requests needing to be processed use
thread pools to manage the http
requests, or a COM+ or DCOM server
uses a thread pool to handle the rpc
requests. This is done so there is
less impact from the processing of one
request to another, say if you ran 3
requests synchronously and the first
request took 1 minute to complete, the
second two requests would not complete
for at least 1 minute adding on top
there own time to process, and for
most of the clients this is not
acceptable.
So how to do this..
Starting with the queue!!
Delphi does provides a TQueue object
which is available but is
unfortunately not thread safe nor
really too efficient, but people
should look at the Contnrs.pas file to
see how borland write there stacks and
queues. There are only two main
functions required for a queue, these
are add and remove/push and pop.
Add/push will add a value, pointer or
object to the end of a queue. And
remove/pop will remove and return the
first value in the queue.
You could derive from TQueue object
and override the protected methods and
add in critical sections, this will
get you some of the way, but I would
want my queue to wait until new
requests are in the queue, and put the
thread into a state of rest while it
waits for new requests. This could be
done by adding in Mutexes or signaling
events but there is an easier way. The
windows api provides an IO completion
queue which provides us with thread
safe access to a queue, and a state of
rest while waiting for new request in
the queue.
Implementing the Thread Pool
The thread pool is going to be very
simple and will manage x number of
threads desired and pass each queue
request to an event provided to be
processed. There is rarely a need to
implement a TThread class and your
logic to be implemented and
encapsulated within the execute event
of the class, thus a simple
TSimpleThread class can be created
which will execute any method in any
object within the context of another
thread. Once people understand this,
all you need to concern yourself with
is allocated memory.
Here is how it is implemented.
TThreadQueue and TThreadPool
implementation
(* Implemented for Delphi3000.com Articles, 11/01/2004
Chris Baldwin
Director & Chief Architect
Alive Technology Limited
http://www.alivetechnology.com
*)
unit ThreadUtilities;
uses Windows, SysUtils, Classes;
type
EThreadStackFinalized = class(Exception);
TSimpleThread = class;
// Thread Safe Pointer Queue
TThreadQueue = class
private
FFinalized: Boolean;
FIOQueue: THandle;
public
constructor Create;
destructor Destroy; override;
procedure Finalize;
procedure Push(Data: Pointer);
function Pop(var Data: Pointer): Boolean;
property Finalized: Boolean read FFinalized;
end;
TThreadExecuteEvent = procedure (Thread: TThread) of object;
TSimpleThread = class(TThread)
private
FExecuteEvent: TThreadExecuteEvent;
protected
procedure Execute(); override;
public
constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
end;
TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object;
TThreadPool = class(TObject)
private
FThreads: TList;
FThreadQueue: TThreadQueue;
FHandlePoolEvent: TThreadPoolEvent;
procedure DoHandleThreadExecute(Thread: TThread);
public
constructor Create( HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual;
destructor Destroy; override;
procedure Add(const Data: Pointer);
end;
implementation
{ TThreadQueue }
constructor TThreadQueue.Create;
begin
//-- Create IO Completion Queue
FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
FFinalized := False;
end;
destructor TThreadQueue.Destroy;
begin
//-- Destroy Completion Queue
if (FIOQueue <> 0) then
CloseHandle(FIOQueue);
inherited;
end;
procedure TThreadQueue.Finalize;
begin
//-- Post a finialize pointer on to the queue
PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF));
FFinalized := True;
end;
(* Pop will return false if the queue is completed *)
function TThreadQueue.Pop(var Data: Pointer): Boolean;
var
A: Cardinal;
OL: POverLapped;
begin
Result := True;
if (not FFinalized) then
//-- Remove/Pop the first pointer from the queue or wait
GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE);
//-- Check if we have finalized the queue for completion
if FFinalized or (OL = Pointer($FFFFFFFF)) then begin
Data := nil;
Result := False;
Finalize;
end;
end;
procedure TThreadQueue.Push(Data: Pointer);
begin
if FFinalized then
Raise EThreadStackFinalized.Create('Stack is finalized');
//-- Add/Push a pointer on to the end of the queue
PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil);
end;
{ TSimpleThread }
constructor TSimpleThread.Create(CreateSuspended: Boolean;
ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
begin
FreeOnTerminate := AFreeOnTerminate;
FExecuteEvent := ExecuteEvent;
inherited Create(CreateSuspended);
end;
procedure TSimpleThread.Execute;
begin
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
end;
{ TThreadPool }
procedure TThreadPool.Add(const Data: Pointer);
begin
FThreadQueue.Push(Data);
end;
constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent;
MaxThreads: Integer);
begin
FHandlePoolEvent := HandlePoolEvent;
FThreadQueue := TThreadQueue.Create;
FThreads := TList.Create;
while FThreads.Count < MaxThreads do
FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
end;
destructor TThreadPool.Destroy;
var
t: Integer;
begin
FThreadQueue.Finalize;
for t := 0 to FThreads.Count-1 do
TThread(FThreads[t]).Terminate;
while (FThreads.Count > 0) do begin
TThread(FThreads[0]).WaitFor;
TThread(FThreads[0]).Free;
FThreads.Delete(0);
end;
FThreadQueue.Free;
FThreads.Free;
inherited;
end;
procedure TThreadPool.DoHandleThreadExecute(Thread: TThread);
var
Data: Pointer;
begin
while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin
try
FHandlePoolEvent(Data, Thread);
except
end;
end;
end;
end.
As you can see it's quite straight
forward, and with this you can
implement very easily any queuing of
requests over threads and really any
type of requirement that requires
threading can be done using these
object and save you a lot of time and
effort.
You can use this to queue requests
from one thread to multiple threads,
or queue requests from multiple
threads down to one thread which makes
this quite a nice solution.
Here are some examples of using these
objects.
Thread safe logging
To allow multiple
threads to asynchronously write to a
log file.
uses Windows, ThreadUtilities,...;
type
PLogRequest = ^TLogRequest;
TLogRequest = record
LogText: String;
end;
TThreadFileLog = class(TObject)
private
FFileName: String;
FThreadPool: TThreadPool;
procedure HandleLogRequest(Data: Pointer; AThread: TThread);
public
constructor Create(const FileName: string);
destructor Destroy; override;
procedure Log(const LogText: string);
end;
implementation
(* Simple reuse of a logtofile function for example *)
procedure LogToFile(const FileName, LogString: String);
var
F: TextFile;
begin
AssignFile(F, FileName);
if not FileExists(FileName) then
Rewrite(F)
else
Append(F);
try
Writeln(F, DateTimeToStr(Now) + ': ' + LogString);
finally
CloseFile(F);
end;
end;
constructor TThreadFileLog.Create(const FileName: string);
begin
FFileName := FileName;
//-- Pool of one thread to handle queue of logs
FThreadPool := TThreadPool.Create(HandleLogRequest, 1);
end;
destructor TThreadFileLog.Destroy;
begin
FThreadPool.Free;
inherited;
end;
procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread);
var
Request: PLogRequest;
begin
Request := Data;
try
LogToFile(FFileName, Request^.LogText);
finally
Dispose(Request);
end;
end;
procedure TThreadFileLog.Log(const LogText: string);
var
Request: PLogRequest;
begin
New(Request);
Request^.LogText := LogText;
FThreadPool.Add(Request);
end;
As this is logging to a file it will
process all requests down to a single
thread, but you could do rich email
notifications with a higher thread
count, or even better, process
profiling with what’s going on or
steps in your program which I will
demonstrate in another article as this
one has got quite long now.
For now I will leave you with this,
enjoy.. Leave a comment if there's
anything people are stuck with.
Chris