Missing something or do I just not understand epoll?
Asked Answered
M

2

8

Full disclosure, I'm a student and this is an assignment. I've been working on it for over a week almost non-stop (in addition to previous time spent) and I can't figure out what I'm doing wrong. My server keeps hanging on epoll_wait after only a "few" recvs are done ("few" because I'm anticipating several GB of data and I'm getting only a few dozen MB). I don't think there's anything wrong with how my client works, because it's working just fine with my select and multi-threaded servers. Please take a quick look and let me know if there's anything that jumps out at you as being the cause of my problem.

The basic idea of the client/server is to bombard the server with connections (10k+) and transfer a given amount of data across several times. This epoll server is having trouble with 2000, when my multi-threaded server handled just shy of the 10k goal.

I am NOT asking for you to do my assignment for me (I'm nearly done) I just need help figuring out what I'm doing wrong here. Thanks in advance for any help you can offer :)

  1 #include "common.h"
  2 #include <sys/epoll.h>
  3 
  4 uint16_t ready[MAX_CONNS];
  5 uint16_t next;
  6 pthread_mutex_t mutex;
  7 
  8 void *worker_thread(void *param) {
  9     int my_sock, pos;
 10     struct conn_params *conn_ps = (struct conn_params *)param;
 11 
 12     while (1) {
 13         pthread_mutex_lock(&mutex);
 14 
 15         while (1) {
 16             if (next == MAX_CONNS) {
 17                 printf("balls\n");
 18                 next = 4;
 19             }
 20 
 21             if (ready[next] != 0) {
 22                 pos = next;
 23                 my_sock = ready[pos];
 24                 next++;
 25                 break;
 26             }
 27         }
 28 
 29         pthread_mutex_unlock(&mutex);
 30         /* handle recv/send */
 31         if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */
 32             shutdown(my_sock, SHUT_RDWR);
 33             close(my_sock);
 34             serv_stats.active_connections--;
 35         }
 36         ready[pos] = 0;
 37 /*      print_conn_stats(&conn_ps[my_sock]);*/
 38     }
 39 }
 40 
 41 void *add_client_thread(void *param) {
 42     struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param;
 43     struct sockaddr client;
 44     struct epoll_event event;
 45     socklen_t client_len;
 46     int new_sock, ret;
 47     char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV];
 48 
 49     bzero(&client, sizeof(struct sockaddr));
 50     event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
 51 
 52     while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) {
 53         set_nonblock(new_sock);
 54         event.data.fd = new_sock;
 55         if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) {
 56             perror("epoll_ctl");
 57             printf("%u\n", new_sock);
 58             continue;
 59         }
 60 
 61         bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params));
 62         eat->conn_ps[new_sock].sock = new_sock;
 63         if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) {
 64             gai_strerror(ret);
 65         }
 66 
 67         update_server_stats();
 68         printf("added client\n");
 69     }
 70 
 71     if (errno != EAGAIN) {
 72         perror("Couldn't accept connection");
 73     }
 74 
 75     pthread_exit(NULL);
 76 }
 77 
 78 int main(int argc, char **argv) {
 79     char opt, *port = NULL;
 80     struct addrinfo hints, *results, *p;
 81     int listen_sock = new_tcp_sock(), nfds, i, ret;
 82     int fd_epoll, next_avail = 4;
 83     struct conn_params conn_ps[MAX_CONNS];
 84     struct epoll_event evs[MAX_CONNS];
 85     struct epoll_event event;
 86     struct epoll_accept_thread eat;
 87     pthread_t thread;
 88 
 89     while ((opt = getopt(argc, argv, ":l:")) != -1) {
 90         switch (opt) {
 91             case 'l': /* port to listen on */
 92                 port = optarg;
 93                 break;
 94             case '?': /* unknown option */
 95                 fprintf(stderr, "The option -%c is not supported.\n", opt);
 96                 exit(1);
 97             case ':': /* required arg not supplied for option */
 98                 fprintf(stderr, "The option -%c requires an argument.\n", opt);
 99                 exit(1);
100         }
101     } /* command line arg processing done */
102 
103     if (port == NULL) {
104         fprintf(stderr, "You must provide the port to listen on (-l).\n");
105         exit(1);
106     }
107 
108     signal(SIGINT, handle_interrupt);
109 
110     bzero(&hints, sizeof(struct addrinfo));
111     hints.ai_family = AF_INET;
112     hints.ai_socktype = SOCK_STREAM;
113     hints.ai_flags = AI_PASSIVE;
114 
115     set_nonblock(listen_sock);
116     set_reuseaddr(listen_sock);
117 
118     if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) {
119         gai_strerror(ret);
120         exit(1);
121     }
122 
123     for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */
124         if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) {
125             perror("Bind failed");
126         } else {
127             break;
128         }
129     }
130 
131     if (p == NULL) { /* we were unable to connect to anything */
132         fprintf(stderr, "Unable to bind to the specified port. Exiting...\n");
133         exit(1);
134     }
135 
136     freeaddrinfo(results);
137 
138     if (listen(listen_sock, 5) == -1) {
139         perror("Listen failed");
140         exit(1);
141     }
142 
143     /* everything is set up. method-specific code goes below */
144 
145     start_server_stats();
146     next = 4;
147 
148     if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) {
149         perror("epoll_create");
150         exit(1);
151     }
152 
153     event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
154     event.data.fd = listen_sock;
155     if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) {
156         perror("epoll_ctl");
157         exit(1);
158     }
159 
160     signal(SIGPIPE, SIG_IGN);
161     bzero(ready, MAX_CONNS * sizeof(uint16_t));
162     pthread_mutex_init(&mutex, NULL);
163 
164     for (i = 0; i < 5; i++) { /* five workers should be enough */
165         pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps);
166     }
167 
168     while (1) {
169         if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) {
170             continue;
171         }
172         for (i = 0; i < nfds; i++) { /* loop through all FDs */
173             if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */
174                 /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/
175                 close(evs[i].data.fd);
176                 continue;
177             } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */
178                 eat.listen_sock = listen_sock;
179                 eat.fd_epoll = fd_epoll;
180                 eat.conn_ps = conn_ps;
181                 pthread_create(&thread, NULL, add_client_thread, (void *)&eat);
182             } else { /* inbound data */
183                 while (ready[next_avail] != 0) {
184                     next_avail++;
185 
186                     if (next_avail == MAX_CONNS) {
187                         next_avail = 4;
188                     }
189                 }
190                 ready[next_avail] = evs[i].data.fd;
191             } /* end inbound data */
192         } /* end iterating through FDs */
193     } /* end epoll_wait loop */
194 
195     perror("epoll_wait");
196 
197     return 0;
198 }

And here's the echo_recv function, as I assume someone's going to want to see that as well:

 14 int echo_recv(struct conn_params *conn_p, int single) {
 15     char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE];
 16     int nread, nwrite, nsent = 0, i;
 17 
 18     while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) {
 19         /* create buffer of MULTIPLIER(int) times what was received */
 20         for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) {
 21             memcpy(buffer+(nread*i), client_buf, nread);
 22         }
 23 
 24         /* send the created buffer */
 25         while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) {
 26             nsent += nwrite;
 27         }
 28 
 29         conn_p->total_recvd += nread; /* update our stats for this conn */
 30         conn_p->total_sent += nsent; /* update our status for this conn */
 31         serv_stats.total_recvd += nread;
 32         serv_stats.total_sent += nsent;
 33         nsent = 0;
 34 
 35         if (single) {
 36             return 1;
 37         }
 38     }
 39 
 40     if (nread == -1 && (errno & EAGAIN)) {
 41         return 1;
 42     }
 43 
 44     if (nread == -1) {
 45         perror("wtf?");
 46     }
 47 
 48     shutdown(conn_p->sock, SHUT_RDWR);
 49     close(conn_p->sock);
 50 
 51     return 0; /* recv failed */
 52 }
Moult answered 6/3, 2012 at 7:56 Comment(4)
Two points: The first is that errno is not a bitfield, so errno & EAGAIN is not correct, use errno == EAGAIN. The second is that you are indexing arrays with the socket descriptor, and they can be any number that fits in an int, are you sure they are less than the size of the arrays?Drinking
@Joachim Pileborg: just curiosity, can you name a system where fd-s are not assigned to the lowest available numbers?Nazar
@KarolyHorvath No, but as fas as I know there is no guarantee that it has to be, or that the numbers have to be consecutive. Also, you will never get a socket with with "values" 0 to 2 unless you close the standard in/out/err.Drinking
Instead of using epoll directly I've always used an intermediate library that took care of the details for me. libevent or libev are both excellent libraries that can use a variety of underlying event mechanisms and will help you accomplish what you want.Kala
T
2

Here are some thoughts:

  1. You should really look at how the shared array ready is accessed. In your worker thread, you acquire a mutex to read it, however there are times when you modify this outside of the lock, additionally, you don't acquire this lock in your polling loop (main thread), you simply write to the array - this is plain wrong.
  2. You don't keep the thread ids for all the worker threads, how do you propose to kill them (or wait for them to complete - normally you'd need to call pthread_join)
  3. You create a separate thread to accept the connection, but again you modify the shared epoll_accept_thread structure in this thread - and there is no locking around it.

I would fix all the synchronization issues first, and that may then reveal other issues.

Trundle answered 6/3, 2012 at 9:36 Comment(5)
If the process is supposed to run forever, killing the worker threads may not be necessary. I've written quite a few programs like that myself, where the threads never exit, unless in absolute panic because something went terribly wrong. I had it kill the entire process though, not just a single thread.Kala
@X-Istence, that may be the case - but if you wanted to ensure that you wait for all the threads to fully complete their tasks before shutting down, join is the way forward.Trundle
Right now, I just leave the process running forever (or kill it in the event I need to re-test), so I didn't think keeping track of the thread IDs was important. As for how ready is accessed in the main thread, I'm checking to find a spot that isn't occupied, and this is the only thread that's filling those slots up, so I figured a mutex around that wasn't required. Fixed the mutex in the worker threads though. Also, I remove the extra assignments in main to the epoll_accept_thread. Those changes haven't yielded any different results :(Moult
@Trundle - 'but if you wanted to ensure that you wait for all the threads to fully complete their tasks before shutting down, join is the way forward'. There's anothher way of putting that: 'if you want to spend a lot of time/effort trying to micro-manage threads, trying to keep up-to-date lists, struggling to shut them down even though they are stuck on blocking calls and still find it takes ages to shut down your app, then join onto every thread'. 'Join' should not be the first thought when creating threads. The OP has the right idea - if it doesn't matter, why join?Salem
@MartinJames, the OP is checking if he's received all the data that has been sent - without waiting for all the threads to complete - how could you possibly guarantee that all the data has been received?Trundle
K
2

I wanted to post this in a comment above but it got much longer than it would allow:

Try implementing a simple epoll based server that is entirely asynchronous (steps)

  1. Set up your accept socket ...
  2. Add to epoll.
  3. enter wait loop:
    1. Check whether event is on accept socket, or normal socket
    2. If accept socket, accept connection, add to epoll, go back to 3
    3. If event on normal socket for reading, read X bytes, save to write buffer, and enable write event on epoll for socket, go back to 3
    4. If event on normal socket for writing, write bytes from buffer to network, disable write event if write buffer is empty, go back to 3.
    5. If an error occurs remove the socket from epoll
  4. There is no fourth step ... the program should loop forever.

This should remove any complexity you've added from having threading that could cause issues. This moves epoll back into the same sort of domain as select() except that it is generally much faster. The whole idea of using an event library is to know when you can read/write instead of setting a socket to non-blocking, and trying to read from it/write to it.

You also never seem to check the return value from write() which may have failed due to receiving a SIGPIPE (I know you ignored the signal, but you will still get an EAGAIN/EINTR errno).

The other thing I see is that you are doing a busy loop inside of your thread that is waiting for sockets to be ready. When you use select() or epoll in this case it is so that you are notified that there is something new, so you don't have to do a busy loop...

I am not exactly sure what you are attempting to accomplish, but your code is extremely inefficient.

What you could do, after implementing just a simple asynchronous example using the steps above is start up multiple worker threads that all listen (using epoll) for read events on the listener/accept socket and have each of the threads handle various connections (still using what I posted above).

Kala answered 6/3, 2012 at 11:27 Comment(1)
Yeah, I know my code is horribly inefficient :( Honestly, I just wanted it to WORK before I started refactoring. I've been bashing my head on the desk for a while with this one.Moult

© 2022 - 2024 — McMap. All rights reserved.