epoll IO with worker threads in C
Asked Answered
A

3

11

I am writing a small server that will receive data from multiple sources and process this data. The sources and data received is significant, but no more than epoll should be able to handle quite well. However, all received data must be parsed and run through a large number of tests which is time consuming and will block a single thread despite epoll multiplexing. Basically, the pattern should be something like follows: IO-loop receives data and bundles it into a job, sends to the first thread available in the pool, the bundle is processed by the job and the result is passed pack to the IO loop for writing to file.

I have decided to go for a single IO thread and N worker threads. The IO thread for accepting tcp connections and reading data is easy to implement using the example provided at: http://linux.die.net/man/7/epoll

Thread are also usually easy enough to deal with, but I am struggling to combine the epoll IO loop with a threadpool in an elegant manner. I am unable to find any "best practice" for using epoll with a worker pool online either, but quite a few questions regarding the same topic.

I therefore have some question I hope someone can help me answering:

  1. Could (and should) eventfd be used as a mechanism for 2-way synchronization between the IO thread and all the workers? For instance, is it a good idea for each worker thread to have its own epoll routine waiting on a shared eventfd (with a struct pointer, containing data/info about the job) i.e. using the eventfd as a job queue somehow? Also perhaps have another eventfd to pass results back into the IO thread from multiple worker threads?
  2. After the IO thread is signaled about more data on a socket, should the actual recv take place on the IO thread, or should the worker recv the data on their own in order to not block the IO thread while parsing data frames etc.? In that case, how can I ensure safety, e.g. in case recv reads 1,5 frames of data in a worker thread and another worker thread receives the last 0,5 frame of data from the same connection?
  3. If the worker thread pool is implemented through mutexes and such, will waiting for locks block the IO thread if N+1 threads are trying to use the same lock?
  4. Are there any good practice patterns for how to build a worker thread pool around epoll with two way communication (i.e. both from IO to workers and back)?

EDIT: Can one possible solution be to update a ring buffer from the IO-loop, after update send the ring buffer index to the workers through a shared pipe for all workers (thus giving away control of that index to the first worker that reads the index off the pipe), let the worker own that index until end of processing and then send the index number back into the IO-thread through a pipe again, thus giving back control?

My application is Linux-only, so I can use Linux-only functionality in order to achieve this in the most elegant way possible. Cross platform support is not needed, but performance and thread safety is.

Alterable answered 19/2, 2014 at 21:21 Comment(2)
I think I may have a useful solution but need to fist know, how soon do you know the length of a single frame/packet? Are they fixed length, is it included in the packet header or do you only know at the end? If you know sooner it is far easier to pass off the work without busying the main thread, but if you do not know to the end the main thread inevitably has to do a lot of reading.Schrader
Hi, I know the length after the recv and after iterating through the recv buffer. They are unfortunately not fixed length and the length do not appear in the packet, but is based on linefeed framing.Alterable
S
5

When performing this model, because we only know the packet size once we have fully received the packet, unfortunately we cannot offload the receive itself to a worker thread. Instead the best we can still do is a thread to receive the data which will have to pass off pointers to fully received packets.

The data itself is probably best held in a circular buffer, however we will want a separate buffer for each input source (if we get a partial packet we can continue receiving from other sources without splitting up the data. The remaining question is how to inform the workers of when a new packet is ready, and to give them a pointer to the data in said packet. Because there is little data here, just some pointers the most elegant way of doing this would be with posix message queues. These provide the ability for multiple senders and multiple receivers to write and read messages, always ensuring every message is received and by precisely 1 thread.

You will want a struct resembling the one below for each data source, I shall go through the fields purposes now.

struct DataSource
{
    int SourceFD;
    char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
    char *LatestPacket;
    char *CurrentLocation
    int SizeLeft;
};

The SourceFD is obviously the file descriptor to the data stream in question, the DataBuffer is where Packets contents are held while being processed, it is a circular buffer. The LatestPacket pointer is used to temporarily hold a pointer to the most resent packet in case we receive a partial packet and move onto another source before passing the packet off. The CurrentLocation stores where the latest packet ends so that we know where to place the next one, or where to carry on in case of partial receive. The size left is the room left in the buffer, this will be used to tell if we can fit the packet or need to circle back around to the beginning.

The receiving function will thus effectively

  • Copy the contents of the packet into the buffer
  • Move CurrentLocation to point to the end of the packet
  • Update SizeLeft to account for the now decreased buffer
  • If we cannot fit the packet in the end of the buffer we cycle around
  • If there is no room there either we try again a bit later, going to another source meanwhile
  • If we had a partial receive store the LatestPacket pointer to point to the start of the packet and go to another stream until we get the rest
  • Send a message using a posix message queue to a worker thread so it can process the data, the message will contain a pointer to the DataSource structure so it can work on it, it also needs a pointer to the packet it is working on, and it's size, these can be calculated when we receive the packet

The worker thread will do its processing using the received pointers and then increase the SizeLeft so the receiver thread will know it can carry on filling the buffer. The atomic functions will be needed to work on the size value in the struct so we don't get race conditions with the size property (as it is possible it is written by a worker and the IO thread simultaneously, causing lost writes, see my comment below), they are listed here and are simple and extremely useful.

Now, I have given some general background but will address the points given specifically:

  1. Using the EventFD as a synchronization mechanism is largely a bad idea, you will find yourself using a fair amount of unneeded CPU time and it is very hard to perform any synchronization. Particularly if you have multiple threads pick up the same file descriptor you could have major problems. This is in effect a nasty hack that will work sometimes but is no real substitute for proper synchronization.
  2. It is also a bad idea to try and offload the receive as explained above, you can get around the issue with complex IPC but frankly it is unlikely receiving IO will take enough time to stall your application, your IO is also likely much slower than CPU so receiving with multiple threads will gain little. (this assumes you do not say, have several 10 gigabit network cards).
  3. Using mutexes or locks is a silly idea here, it fits much better into lockless coding given the low amount of (simultaneously) shared data, you are really just handing off work and data. This will also boost performance of the receive thread and make your app far more scalable. Using the functions mentioned here http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html you can do this nice and easily. If you did do it this way, what you would need is a semaphore, this can be unlocked every time a packet is received and locked by each thread which starts a job to allow dynamically more threads in if more packets are ready, that would have far less overhead then a homebrew solution with mutexes.
  4. There is not really much difference here to any thread pool, you spawn a lot of threads then have them all block in mq_receive on the data message queue to wait for messages. When they are done they send their result back to the main thread which adds the results message queue to its epoll list. It can then receive results this way, it is simple and very efficient for small data payloads like pointers. This will also use little CPU and not force the main thread to waste time managing workers.

Finally your edit is fairly sensible, except for the fact as I ave suggested, message queues are far better than pipes here as they very efficiently signal events , guarantee a full message read and provide automatic framing.

I hope this helps, however it is late so if I missed anything or you have questions feel free to comment for clarification or more explanation.

Schrader answered 25/2, 2014 at 23:14 Comment(8)
Thanks for a long answer. Just a few questions: Can i assume that multiple threads may block on the same queue in order to wait for new tasks? Could also multiple threads write to another queue for passing finished work back? In such a design, do I really need the builtins as described above?Alterable
@invictus Message queues are indeed a many to many relationship, they are extremely powerful as any number of senders and any number of receivers can use a queue and messages will always get passed to precisely one of the listening threads. The built-ins above are not needed for the vast majority of the code, the only use is to ensure that SizeLeft is updated atomically to ensure the receiver thread and worker threads do not update it simultaneously and cause it to become corrupt eg: thread1 loads value, thread2 loads value, thread1 writes it it, thread2 writes it, thread1's write is lost.Schrader
Sounds like this is the solution I have been looking for then. Do you know how mq_* performs (i.e. the performance overhead)?Alterable
@invictus Though the performance depends on your specific application and the exact kernel version you have, you can expect that they will provide much higher performance than something like a socket but marginally slower than a bespoke shared memory queue system, however they are completely thread safe and will save you a lot of time designing. More importantly the only time they become slow is with very large messages (as the message is internally stored twice, once in the senders memory and once in the receiver's) so if you only pass pointers and small messages overhead is negligible.Schrader
Also, a note I forgot to mention, posix message queues are guaranteed to never block the sender when trying to send, the data transfer is all done asynchronously. This means none of the overhead will ever slow the IO thread.Schrader
I understand. I assume a shared memory queue system is out of the question for me as I want results fed back into the main IO loop and need some way of waiting for events from both the worker thread and the IO FDs. It still sounds for me as if your suggestion is the way to go :)Alterable
I guess a pipe could also work if one wanted to make a more portale solution then?Alterable
@invictus Unfortunately a pipe is not thread safe in that way, you have no guarantee whatsoever regarding framing. A message queue ensures that 1 message in it's entirety reaches one thread, with a pipe if the message is more than 1 byte it can become interleaved in an undefined way and several threads may each read part of a packet creating what would likely be an immediate segfault when you then try and dereference a size as a pointer. Message queues are actually very portable, being supported on just about every Posix system.Schrader
H
7

In my tests, one epoll instance per thread outperformed complicated threading models by far. If listener sockets are added to all epoll instances, the workers would simply accept(2) and the winner would be awarded the connection and process it for its lifetime.

Your workers could look something like this:

for (;;) {
    nfds = epoll_wait(worker->efd, &evs, 1024, -1);

    for (i = 0; i < nfds; i++)
        ((struct socket_context*)evs[i].data.ptr)->handler(
            evs[i].data.ptr,
            evs[i].events);
}

And every file descriptor added to an epoll instance could have a struct socket_context associated with it:

void listener_handler(struct socket_context* ctx, int ev)
{
    struct socket_context* conn;

    conn->fd = accept(ctx->fd, NULL, NULL);
    conn->handler = conn_handler;

    /* add to calling worker's epoll instance or implement some form
     * of load balancing */
}

void conn_handler(struct socket_context* ctx, int ev)
{
    /* read all available data and process. if incomplete, stash
     * data in ctx and continue next time handler is called */
}

void dummy_handler(struct socket_context* ctx, int ev)
{
    /* handle exit condition async by adding a pipe with its
     * own handler */
}

I like this strategy because:

  • very simple design;
  • all threads are identical;
  • workers and connections are isolated--no stepping on each other's toes or calling read(2) in the wrong worker;
  • no locks are required (the kernel gets to worry about synchronization on accept(2));
  • somewhat naturally load balanced since no busy worker will actively contend on accept(2).

And some notes on epoll:

  • use edge-triggered mode, non-blocking sockets and always read until EAGAIN;
  • avoid dup(2) family of calls to spare yourself from some surprises (epoll registers file descriptors, but actually watches file descriptions);
  • you can epoll_ctl(2) other threads' epoll instances safely;
  • use a large struct epoll_event buffer for epoll_wait(2) to avoid starvation.

Some other notes:

  • use accept4(2) to save a system call;
  • use one thread per core (1 for each physical if CPU-bound, or 1 for each each logical if I/O-bound);
  • poll(2)/select(2) is likely faster if connection count is low.

I hope this helps.

Hippodrome answered 20/2, 2014 at 0:21 Comment(5)
I like this idea, however, I am worried that my heavy workload after each recv will block other connections. Also, won't this possibly lead to unbalanced workload for each thread if a thread is "lucky" enough to pick the next accept first? Furthermore, if I have only 4-5 connections, I may still like 30 worker threads to handle what they produce.Alterable
@invictus Yes, the workload won't be perfectly balanced unless you distribute the connections evenly yourself in the listener handler, which may add some complexity. Is your work CPU- or I/O-bound? More threads than processor cores will just introduce more context switching if it's CPU-bound.Hippodrome
@pindumb Won't be a problem for low thread counts (one per physical core). Under load, chances are only a small portion of the threads will witness the readable listener. If it is a concern, the listener could be rotated amongst the threads. It would be a different story with hundreds or thousands of threads.Hippodrome
I believe, although an interesting idea, that this particular design is not a good fit since I believe a single thread should be able to handle all IO, while the actual processing is highly CPU bound. I will add a bounty in 14 hours, because I have been searching for a good answer for this for years, and all I have found are others wondering the same. Would be nice to get a "text book answer" that is googable for others :)Alterable
@invictus That's cool. I've been doing something similar but with I/O-bound work. I tried 1 listener thread + N workers w/ single and separate lock-free FIFO work queues. What killed performance was cache line bouncing and figuring out when/how to sleep and wake workers. Isolation was truly the key to 10-100x speedup. However, sharing writable data may be negligible in your case if processing is lengthy. Good luck. :)Hippodrome
S
5

When performing this model, because we only know the packet size once we have fully received the packet, unfortunately we cannot offload the receive itself to a worker thread. Instead the best we can still do is a thread to receive the data which will have to pass off pointers to fully received packets.

The data itself is probably best held in a circular buffer, however we will want a separate buffer for each input source (if we get a partial packet we can continue receiving from other sources without splitting up the data. The remaining question is how to inform the workers of when a new packet is ready, and to give them a pointer to the data in said packet. Because there is little data here, just some pointers the most elegant way of doing this would be with posix message queues. These provide the ability for multiple senders and multiple receivers to write and read messages, always ensuring every message is received and by precisely 1 thread.

You will want a struct resembling the one below for each data source, I shall go through the fields purposes now.

struct DataSource
{
    int SourceFD;
    char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
    char *LatestPacket;
    char *CurrentLocation
    int SizeLeft;
};

The SourceFD is obviously the file descriptor to the data stream in question, the DataBuffer is where Packets contents are held while being processed, it is a circular buffer. The LatestPacket pointer is used to temporarily hold a pointer to the most resent packet in case we receive a partial packet and move onto another source before passing the packet off. The CurrentLocation stores where the latest packet ends so that we know where to place the next one, or where to carry on in case of partial receive. The size left is the room left in the buffer, this will be used to tell if we can fit the packet or need to circle back around to the beginning.

The receiving function will thus effectively

  • Copy the contents of the packet into the buffer
  • Move CurrentLocation to point to the end of the packet
  • Update SizeLeft to account for the now decreased buffer
  • If we cannot fit the packet in the end of the buffer we cycle around
  • If there is no room there either we try again a bit later, going to another source meanwhile
  • If we had a partial receive store the LatestPacket pointer to point to the start of the packet and go to another stream until we get the rest
  • Send a message using a posix message queue to a worker thread so it can process the data, the message will contain a pointer to the DataSource structure so it can work on it, it also needs a pointer to the packet it is working on, and it's size, these can be calculated when we receive the packet

The worker thread will do its processing using the received pointers and then increase the SizeLeft so the receiver thread will know it can carry on filling the buffer. The atomic functions will be needed to work on the size value in the struct so we don't get race conditions with the size property (as it is possible it is written by a worker and the IO thread simultaneously, causing lost writes, see my comment below), they are listed here and are simple and extremely useful.

Now, I have given some general background but will address the points given specifically:

  1. Using the EventFD as a synchronization mechanism is largely a bad idea, you will find yourself using a fair amount of unneeded CPU time and it is very hard to perform any synchronization. Particularly if you have multiple threads pick up the same file descriptor you could have major problems. This is in effect a nasty hack that will work sometimes but is no real substitute for proper synchronization.
  2. It is also a bad idea to try and offload the receive as explained above, you can get around the issue with complex IPC but frankly it is unlikely receiving IO will take enough time to stall your application, your IO is also likely much slower than CPU so receiving with multiple threads will gain little. (this assumes you do not say, have several 10 gigabit network cards).
  3. Using mutexes or locks is a silly idea here, it fits much better into lockless coding given the low amount of (simultaneously) shared data, you are really just handing off work and data. This will also boost performance of the receive thread and make your app far more scalable. Using the functions mentioned here http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html you can do this nice and easily. If you did do it this way, what you would need is a semaphore, this can be unlocked every time a packet is received and locked by each thread which starts a job to allow dynamically more threads in if more packets are ready, that would have far less overhead then a homebrew solution with mutexes.
  4. There is not really much difference here to any thread pool, you spawn a lot of threads then have them all block in mq_receive on the data message queue to wait for messages. When they are done they send their result back to the main thread which adds the results message queue to its epoll list. It can then receive results this way, it is simple and very efficient for small data payloads like pointers. This will also use little CPU and not force the main thread to waste time managing workers.

Finally your edit is fairly sensible, except for the fact as I ave suggested, message queues are far better than pipes here as they very efficiently signal events , guarantee a full message read and provide automatic framing.

I hope this helps, however it is late so if I missed anything or you have questions feel free to comment for clarification or more explanation.

Schrader answered 25/2, 2014 at 23:14 Comment(8)
Thanks for a long answer. Just a few questions: Can i assume that multiple threads may block on the same queue in order to wait for new tasks? Could also multiple threads write to another queue for passing finished work back? In such a design, do I really need the builtins as described above?Alterable
@invictus Message queues are indeed a many to many relationship, they are extremely powerful as any number of senders and any number of receivers can use a queue and messages will always get passed to precisely one of the listening threads. The built-ins above are not needed for the vast majority of the code, the only use is to ensure that SizeLeft is updated atomically to ensure the receiver thread and worker threads do not update it simultaneously and cause it to become corrupt eg: thread1 loads value, thread2 loads value, thread1 writes it it, thread2 writes it, thread1's write is lost.Schrader
Sounds like this is the solution I have been looking for then. Do you know how mq_* performs (i.e. the performance overhead)?Alterable
@invictus Though the performance depends on your specific application and the exact kernel version you have, you can expect that they will provide much higher performance than something like a socket but marginally slower than a bespoke shared memory queue system, however they are completely thread safe and will save you a lot of time designing. More importantly the only time they become slow is with very large messages (as the message is internally stored twice, once in the senders memory and once in the receiver's) so if you only pass pointers and small messages overhead is negligible.Schrader
Also, a note I forgot to mention, posix message queues are guaranteed to never block the sender when trying to send, the data transfer is all done asynchronously. This means none of the overhead will ever slow the IO thread.Schrader
I understand. I assume a shared memory queue system is out of the question for me as I want results fed back into the main IO loop and need some way of waiting for events from both the worker thread and the IO FDs. It still sounds for me as if your suggestion is the way to go :)Alterable
I guess a pipe could also work if one wanted to make a more portale solution then?Alterable
@invictus Unfortunately a pipe is not thread safe in that way, you have no guarantee whatsoever regarding framing. A message queue ensures that 1 message in it's entirety reaches one thread, with a pipe if the message is more than 1 byte it can become interleaved in an undefined way and several threads may each read part of a packet creating what would likely be an immediate segfault when you then try and dereference a size as a pointer. Message queues are actually very portable, being supported on just about every Posix system.Schrader
C
0

I post the same answer in other post: I want to wait on both a file descriptor and a mutex, what's the recommended way to do this?

==========================================================

This is a very common seen problem, especially when you are developing network server-side program. Most Linux server-side program's main look will loop like this:

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        resp = proc(req);
        fd.send(resp);
    }
}

It is single threaded(the main thread), epoll based server framework. The problem is, it is single threaded, not multi-threaded. It requires that proc() should never blocks or runs for a significant time(say 10 ms for common cases).

If proc() will ever runs for a long time, WE NEED MULTI THREADS, and executes proc() in a separated thread(the worker thread).

We can submit task to the worker thread without blocking the main thread, using a mutex based message queue, it is fast enough.

Then we need a way to obtain the task result from a worker thread. How? If we just check the message queue directly, before or after epoll_wait(), however, the checking action will execute after epoll_wait() to end, and epoll_wait() usually blocks for 10 micro seconds(common cases) if all file descriptors it wait are not active.

For a server, 10 ms is quite a long time! Can we signal epoll_wait() to end immediately when task result is generated?

Yes! I will describe how it is done in one of my open source project.

Create a pipe for all worker threads, and epoll waits on that pipe as well. Once a task result is generated, the worker thread writes one byte into the pipe, then epoll_wait() will end in nearly the same time! - Linux pipe has 5 us to 20 us latency.

In my project SSDB(a Redis protocol compatible in-disk NoSQL database), I create a SelectableQueue for passing messages between the main thread and worker threads. Just like its name, SelectableQueue has an file descriptor, which can be wait by epoll.

SelectableQueue: https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

Usage in main thread:

epoll_add(serv_sock);
epoll_add(queue->fd());
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        if(fd is worker_thread){
            sock, resp = worker->pop_result();
            sock.send(resp);
        }
        if(fd is client_socket){
            req = fd.read();
            worker->add_task(fd, req);
        }
    }
}

Usage in worker thread:

fd, req = queue->pop_task();
resp = proc(req);
queue->add_result(fd, resp);
Chandelle answered 14/9, 2017 at 10:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.