Changeset 368
- Timestamp:
- 06/06/07 23:49:50 (2 years ago)
- Files:
-
- version_0/ext/em.cpp (modified) (12 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
version_0/ext/em.cpp
r349 r368 43 43 NextHeartbeatTime (0), 44 44 LoopBreakerReader (-1), 45 LoopBreakerWriter (-1) 45 LoopBreakerWriter (-1), 46 bEpoll (false), 47 epfd (-1) 46 48 { 47 49 // Default time-slice is just smaller than one hundred mills. … … 84 86 close (LoopBreakerReader); 85 87 close (LoopBreakerWriter); 86 } 87 88 89 if (epfd != -1) 90 close (epfd); 91 } 92 93 94 /************************* 95 EventMachine_t::_UseEpoll 96 *************************/ 97 98 void EventMachine_t::_UseEpoll() 99 { 100 /* Temporary. 101 * Use an internal flag to switch in epoll-based functionality until we determine 102 * how it should be integrated properly and the extent of the required changes. 103 * A permanent solution needs to allow the integration of additional technologies, 104 * like kqueue and Solaris's events. 105 */ 106 107 bEpoll = true; 108 } 88 109 89 110 … … 166 187 167 188 189 /**************************************** 190 (STATIC) EventMachine_t::SetRlimitNofile 191 ****************************************/ 192 193 int EventMachine_t::SetRlimitNofile (int nofiles) 194 { 195 struct rlimit rlim; 196 if (nofiles >= 0) { 197 rlim.rlim_cur = nofiles; 198 if (nofiles > rlim.rlim_max) 199 rlim.rlim_max = nofiles; 200 setrlimit (RLIMIT_NOFILE, &rlim); 201 // ignore the error return, for now at least. 202 } 203 getrlimit (RLIMIT_NOFILE, &rlim); 204 return rlim.rlim_cur; 205 } 206 207 168 208 /********************************* 169 209 EventMachine_t::SignalLoopBreaker … … 238 278 #ifdef OS_WIN32 239 279 HookControlC (true); 280 #endif 281 282 #ifdef HAVE_EPOLL 283 if (bEpoll) { 284 epfd = epoll_create (MaxEpollDescriptors); 285 if (epfd == -1) { 286 char buf[200]; 287 snprintf (buf, sizeof(buf)-1, "unable to create epoll descriptor: %s", strerror(errno)); 288 throw std::runtime_error (buf); 289 } 290 int cloexec = fcntl (epfd, F_GETFD, 0); 291 assert (cloexec >= 0); 292 cloexec |= FD_CLOEXEC; 293 fcntl (epfd, F_SETFD, cloexec); 294 } 240 295 #endif 241 296 … … 257 312 258 313 259 260 314 /************************ 261 315 EventMachine_t::_RunOnce … … 263 317 264 318 bool EventMachine_t::_RunOnce() 319 { 320 if (bEpoll) 321 return _RunEpollOnce(); 322 else 323 return _RunSelectOnce(); 324 } 325 326 327 328 /***************************** 329 EventMachine_t::_RunEpollOnce 330 *****************************/ 331 332 bool EventMachine_t::_RunEpollOnce() 333 { 334 #ifdef HAVE_EPOLL 335 assert (epfd != -1); 336 struct epoll_event ev [MaxEpollDescriptors]; 337 int s = epoll_wait (epfd, ev, MaxEpollDescriptors, 10); 338 if (s > 0) { 339 for (int i=0; i < s; i++) { 340 EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr; 341 assert (ed); 342 343 if (ev[i].events & (EPOLLERR | EPOLLHUP)) 344 ed->ScheduleClose (false); 345 if (ev[i].events & EPOLLIN) 346 ed->Read(); 347 if (ev[i].events & EPOLLOUT) { 348 ed->Write(); 349 int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent()); 350 } 351 } 352 } 353 else if (s < 0) { 354 // epoll_wait can fail on error in a handful of ways. 355 // If this happens, then wait for a little while to avoid busy-looping. 356 // If the error was EINTR, we probably caught SIGCHLD or something, 357 // so keep the wait short. 358 timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000}; 359 EmSelect (0, NULL, NULL, NULL, &tv); 360 } 361 362 { // cleanup dying sockets 363 // vector::pop_back works in constant time. 364 int i, j; 365 int nSockets = Descriptors.size(); 366 for (i=0, j=0; i < nSockets; i++) { 367 EventableDescriptor *ed = Descriptors[i]; 368 assert (ed); 369 if (ed->ShouldDelete()) { 370 if (bEpoll) { // well, this will always be true. Otherwise we wouldn't be here. 371 assert (epfd != -1); 372 int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent()); 373 // ENOENT is not an error because the socket may be already closed when we get here. 374 if (e) { 375 char buf [200]; 376 snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno)); 377 throw std::runtime_error (buf); 378 } 379 } 380 381 delete ed; 382 } 383 else 384 Descriptors [j++] = ed; 385 } 386 while ((size_t)j < Descriptors.size()) 387 Descriptors.pop_back(); 388 389 } 390 391 // TODO, heartbeats. 392 393 timeval tv = {0,0}; 394 EmSelect (0, NULL, NULL, NULL, &tv); 395 396 return true; 397 #else 398 throw std::runtime_error ("epoll is not implemented on this platform"); 399 #endif 400 } 401 402 403 /********************************* 404 EventMachine_t::_ModifyEpollEvent 405 *********************************/ 406 407 void EventMachine_t::_ModifyEpollEvent (EventableDescriptor *ed) 408 { 409 #ifdef HAVE_EPOLL 410 if (bEpoll) { 411 assert (epfd != -1); 412 assert (ed); 413 int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent()); 414 if (e) { 415 char buf [200]; 416 snprintf (buf, sizeof(buf)-1, "unable to modify epoll event: %s", strerror(errno)); 417 throw std::runtime_error (buf); 418 } 419 } 420 #endif 421 } 422 423 424 /****************************** 425 EventMachine_t::_RunSelectOnce 426 ******************************/ 427 428 bool EventMachine_t::_RunSelectOnce() 265 429 { 266 430 // Crank the event machine once. … … 582 746 * (To wit, the ConnectionCompleted event gets sent to the client.) 583 747 */ 584 ConnectionDescriptor *cd = new ConnectionDescriptor ( sd);748 ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 585 749 if (!cd) 586 750 throw std::runtime_error ("no connection allocated"); … … 600 764 // Put the connection on the stack and wait for it to complete 601 765 // or time out. 602 ConnectionDescriptor *cd = new ConnectionDescriptor ( sd);766 ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 603 767 if (!cd) 604 768 throw std::runtime_error ("no connection allocated"); … … 620 784 * for people to know that a failure occurred. 621 785 */ 622 ConnectionDescriptor *cd = new ConnectionDescriptor ( sd);786 ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 623 787 if (!cd) 624 788 throw std::runtime_error ("no connection allocated"); … … 647 811 // Put the connection on the stack and wait for it to complete 648 812 // or time out. 649 ConnectionDescriptor *cd = new ConnectionDescriptor ( sd);813 ConnectionDescriptor *cd = new ConnectionDescriptor (this, sd); 650 814 if (!cd) 651 815 throw std::runtime_error ("no connection allocated"); … … 724 888 // we still set the "pending" flag, so some needed initializations take 725 889 // place. 726 ConnectionDescriptor *cd = new ConnectionDescriptor ( fd);890 ConnectionDescriptor *cd = new ConnectionDescriptor (this, fd); 727 891 if (!cd) 728 892 throw std::runtime_error ("no connection allocated"); … … 928 1092 if (ed == NULL) 929 1093 throw std::runtime_error ("adding bad descriptor"); 1094 1095 #if HAVE_EPOLL 1096 if (bEpoll) { 1097 assert (epfd != -1); 1098 int e = epoll_ctl (epfd, EPOLL_CTL_ADD, ed->GetSocket(), ed->GetEpollEvent()); 1099 if (e) { 1100 char buf [200]; 1101 snprintf (buf, sizeof(buf)-1, "unable to add new descriptor: %s", strerror(errno)); 1102 throw std::runtime_error (buf); 1103 } 1104 } 1105 #endif 1106 930 1107 Descriptors.push_back (ed); 931 1108 }
