Full disclosure, I'm a student and this is an assignment. I've been working on it for over a week almost non-stop (in addition to previous time spent) and I can't figure out what I'm doing wrong. My server keeps hanging on epoll_wait after only a "few" recvs are done ("few" because I'm anticipating several GB of data and I'm getting only a few dozen MB). I don't think there's anything wrong with how my client works, because it's working just fine with my select and multi-threaded servers. Please take a quick look and let me know if there's anything that jumps out at you as being the cause of my problem.
The basic idea of the client/server is to bombard the server with connections (10k+) and transfer a given amount of data across several times. This epoll server is having trouble with 2000, when my multi-threaded server handled just shy of the 10k goal.
I am NOT asking for you to do my assignment for me (I'm nearly done) I just need help figuring out what I'm doing wrong here. Thanks in advance for any help you can offer :)
1 #include "common.h"
2 #include <sys/epoll.h>
3
4 uint16_t ready[MAX_CONNS];
5 uint16_t next;
6 pthread_mutex_t mutex;
7
8 void *worker_thread(void *param) {
9 int my_sock, pos;
10 struct conn_params *conn_ps = (struct conn_params *)param;
11
12 while (1) {
13 pthread_mutex_lock(&mutex);
14
15 while (1) {
16 if (next == MAX_CONNS) {
17 printf("balls\n");
18 next = 4;
19 }
20
21 if (ready[next] != 0) {
22 pos = next;
23 my_sock = ready[pos];
24 next++;
25 break;
26 }
27 }
28
29 pthread_mutex_unlock(&mutex);
30 /* handle recv/send */
31 if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */
32 shutdown(my_sock, SHUT_RDWR);
33 close(my_sock);
34 serv_stats.active_connections--;
35 }
36 ready[pos] = 0;
37 /* print_conn_stats(&conn_ps[my_sock]);*/
38 }
39 }
40
41 void *add_client_thread(void *param) {
42 struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param;
43 struct sockaddr client;
44 struct epoll_event event;
45 socklen_t client_len;
46 int new_sock, ret;
47 char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV];
48
49 bzero(&client, sizeof(struct sockaddr));
50 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
51
52 while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) {
53 set_nonblock(new_sock);
54 event.data.fd = new_sock;
55 if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) {
56 perror("epoll_ctl");
57 printf("%u\n", new_sock);
58 continue;
59 }
60
61 bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params));
62 eat->conn_ps[new_sock].sock = new_sock;
63 if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) {
64 gai_strerror(ret);
65 }
66
67 update_server_stats();
68 printf("added client\n");
69 }
70
71 if (errno != EAGAIN) {
72 perror("Couldn't accept connection");
73 }
74
75 pthread_exit(NULL);
76 }
77
78 int main(int argc, char **argv) {
79 char opt, *port = NULL;
80 struct addrinfo hints, *results, *p;
81 int listen_sock = new_tcp_sock(), nfds, i, ret;
82 int fd_epoll, next_avail = 4;
83 struct conn_params conn_ps[MAX_CONNS];
84 struct epoll_event evs[MAX_CONNS];
85 struct epoll_event event;
86 struct epoll_accept_thread eat;
87 pthread_t thread;
88
89 while ((opt = getopt(argc, argv, ":l:")) != -1) {
90 switch (opt) {
91 case 'l': /* port to listen on */
92 port = optarg;
93 break;
94 case '?': /* unknown option */
95 fprintf(stderr, "The option -%c is not supported.\n", opt);
96 exit(1);
97 case ':': /* required arg not supplied for option */
98 fprintf(stderr, "The option -%c requires an argument.\n", opt);
99 exit(1);
100 }
101 } /* command line arg processing done */
102
103 if (port == NULL) {
104 fprintf(stderr, "You must provide the port to listen on (-l).\n");
105 exit(1);
106 }
107
108 signal(SIGINT, handle_interrupt);
109
110 bzero(&hints, sizeof(struct addrinfo));
111 hints.ai_family = AF_INET;
112 hints.ai_socktype = SOCK_STREAM;
113 hints.ai_flags = AI_PASSIVE;
114
115 set_nonblock(listen_sock);
116 set_reuseaddr(listen_sock);
117
118 if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) {
119 gai_strerror(ret);
120 exit(1);
121 }
122
123 for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */
124 if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) {
125 perror("Bind failed");
126 } else {
127 break;
128 }
129 }
130
131 if (p == NULL) { /* we were unable to connect to anything */
132 fprintf(stderr, "Unable to bind to the specified port. Exiting...\n");
133 exit(1);
134 }
135
136 freeaddrinfo(results);
137
138 if (listen(listen_sock, 5) == -1) {
139 perror("Listen failed");
140 exit(1);
141 }
142
143 /* everything is set up. method-specific code goes below */
144
145 start_server_stats();
146 next = 4;
147
148 if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) {
149 perror("epoll_create");
150 exit(1);
151 }
152
153 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
154 event.data.fd = listen_sock;
155 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) {
156 perror("epoll_ctl");
157 exit(1);
158 }
159
160 signal(SIGPIPE, SIG_IGN);
161 bzero(ready, MAX_CONNS * sizeof(uint16_t));
162 pthread_mutex_init(&mutex, NULL);
163
164 for (i = 0; i < 5; i++) { /* five workers should be enough */
165 pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps);
166 }
167
168 while (1) {
169 if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) {
170 continue;
171 }
172 for (i = 0; i < nfds; i++) { /* loop through all FDs */
173 if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */
174 /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/
175 close(evs[i].data.fd);
176 continue;
177 } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */
178 eat.listen_sock = listen_sock;
179 eat.fd_epoll = fd_epoll;
180 eat.conn_ps = conn_ps;
181 pthread_create(&thread, NULL, add_client_thread, (void *)&eat);
182 } else { /* inbound data */
183 while (ready[next_avail] != 0) {
184 next_avail++;
185
186 if (next_avail == MAX_CONNS) {
187 next_avail = 4;
188 }
189 }
190 ready[next_avail] = evs[i].data.fd;
191 } /* end inbound data */
192 } /* end iterating through FDs */
193 } /* end epoll_wait loop */
194
195 perror("epoll_wait");
196
197 return 0;
198 }
And here's the echo_recv function, as I assume someone's going to want to see that as well:
14 int echo_recv(struct conn_params *conn_p, int single) {
15 char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE];
16 int nread, nwrite, nsent = 0, i;
17
18 while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) {
19 /* create buffer of MULTIPLIER(int) times what was received */
20 for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) {
21 memcpy(buffer+(nread*i), client_buf, nread);
22 }
23
24 /* send the created buffer */
25 while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) {
26 nsent += nwrite;
27 }
28
29 conn_p->total_recvd += nread; /* update our stats for this conn */
30 conn_p->total_sent += nsent; /* update our status for this conn */
31 serv_stats.total_recvd += nread;
32 serv_stats.total_sent += nsent;
33 nsent = 0;
34
35 if (single) {
36 return 1;
37 }
38 }
39
40 if (nread == -1 && (errno & EAGAIN)) {
41 return 1;
42 }
43
44 if (nread == -1) {
45 perror("wtf?");
46 }
47
48 shutdown(conn_p->sock, SHUT_RDWR);
49 close(conn_p->sock);
50
51 return 0; /* recv failed */
52 }
errno
is not a bitfield, soerrno & EAGAIN
is not correct, useerrno == EAGAIN
. The second is that you are indexing arrays with the socket descriptor, and they can be any number that fits in anint
, are you sure they are less than the size of the arrays? – Drinking