Changeset 372
- Timestamp:
- 06/07/07 15:31:12 (2 years ago)
- Files:
-
- version_0/ext/ed.cpp (modified) (10 diffs)
- version_0/ext/ed.h (modified) (9 diffs)
- version_0/ext/em.cpp (modified) (14 diffs)
- version_0/ext/em.h (modified) (1 diff)
- version_0/ext/files.cpp (modified) (1 diff)
- version_0/ext/files.h (modified) (1 diff)
- version_0/ext/pipe.cpp (modified) (1 diff)
- version_0/tests/test_epoll.rb (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
version_0/ext/ed.cpp
r367 r372 44 44 ****************************************/ 45 45 46 EventableDescriptor::EventableDescriptor (int sd ):46 EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em): 47 47 EventCallback (NULL), 48 48 LastRead (0), … … 50 50 MySocket (sd), 51 51 bCloseNow (false), 52 bCloseAfterWriting (false) 52 bCloseAfterWriting (false), 53 bCallbackUnbind (true), 54 MyEventMachine (em) 53 55 { 54 56 /* There are three ways to close a socket, all of which should … … 75 77 if (sd == INVALID_SOCKET) 76 78 throw std::runtime_error ("bad eventable descriptor"); 79 if (MyEventMachine == NULL) 80 throw std::runtime_error ("bad em in eventable descriptor"); 77 81 CreatedAt = gCurrentLoopTime; 78 82 … … 89 93 EventableDescriptor::~EventableDescriptor() 90 94 { 91 if (EventCallback )95 if (EventCallback && bCallbackUnbind) 92 96 (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_UNBOUND, NULL, 0); 93 97 Close(); … … 156 160 ******************************************/ 157 161 158 ConnectionDescriptor::ConnectionDescriptor ( EventMachine_t *em, int sd):159 EventableDescriptor (sd ),162 ConnectionDescriptor::ConnectionDescriptor (int sd, EventMachine_t *em): 163 EventableDescriptor (sd, em), 160 164 bConnectPending (false), 161 165 bReadAttemptedAfterClose (false), … … 166 170 bIsServer (false), 167 171 LastIo (gCurrentLoopTime), 168 InactivityTimeout (0), 169 MyEventMachine (em) 172 InactivityTimeout (0) 170 173 { 171 174 #ifdef HAVE_EPOLL … … 691 694 692 695 693 /************************************** 694 AcceptorDescriptor::AcceptorDescriptor 695 **************************************/ 696 697 AcceptorDescriptor::AcceptorDescriptor (EventMachine_t *parent_em, int sd): 698 EventableDescriptor (sd), 699 MyEventMachine (parent_em) 696 /**************************************** 697 LoopbreakDescriptor::LoopbreakDescriptor 698 ****************************************/ 699 700 LoopbreakDescriptor::LoopbreakDescriptor (int sd, EventMachine_t *parent_em): 701 EventableDescriptor (sd, parent_em) 700 702 { 701 703 /* This is really bad and ugly. Change someday if possible. … … 704 706 */ 705 707 706 if (!MyEventMachine) 707 throw std::runtime_error ("bad event-machine passed to acceptor"); 708 708 bCallbackUnbind = false; 709 710 #ifdef HAVE_EPOLL 711 EpollEvent.events = EPOLLIN; 712 #endif 713 } 714 715 716 717 718 /************************* 719 LoopbreakDescriptor::Read 720 *************************/ 721 722 void LoopbreakDescriptor::Read() 723 { 724 // TODO, refactor, this code is probably in the wrong place. 725 assert (MyEventMachine); 726 MyEventMachine->_ReadLoopBreaker(); 727 } 728 729 730 /************************** 731 LoopbreakDescriptor::Write 732 **************************/ 733 734 void LoopbreakDescriptor::Write() 735 { 736 // Why are we here? 737 throw std::runtime_error ("bad code path in loopbreak"); 738 } 739 740 /************************************** 741 AcceptorDescriptor::AcceptorDescriptor 742 **************************************/ 743 744 AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em): 745 EventableDescriptor (sd, parent_em) 746 { 709 747 #ifdef HAVE_EPOLL 710 748 EpollEvent.events = EPOLLIN; … … 772 810 773 811 774 ConnectionDescriptor *cd = new ConnectionDescriptor ( MyEventMachine, sd);812 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, MyEventMachine); 775 813 if (!cd) 776 814 throw std::runtime_error ("no newly accepted connection"); … … 816 854 **************************************/ 817 855 818 DatagramDescriptor::DatagramDescriptor (int sd ):819 EventableDescriptor (sd ),856 DatagramDescriptor::DatagramDescriptor (int sd, EventMachine_t *parent_em): 857 EventableDescriptor (sd, parent_em), 820 858 OutboundDataSize (0), 821 859 LastIo (gCurrentLoopTime), version_0/ext/ed.h
r365 r372 37 37 { 38 38 public: 39 EventableDescriptor (int );39 EventableDescriptor (int, EventMachine_t*); 40 40 virtual ~EventableDescriptor(); 41 41 … … 87 87 bool bCloseNow; 88 88 bool bCloseAfterWriting; 89 bool bCallbackUnbind; 89 90 90 91 #ifdef HAVE_EPOLL … … 92 93 #endif 93 94 94 }; 95 95 EventMachine_t *MyEventMachine; 96 }; 97 98 99 100 /************************* 101 class LoopbreakDescriptor 102 *************************/ 103 104 class LoopbreakDescriptor: public EventableDescriptor 105 { 106 public: 107 LoopbreakDescriptor (int, EventMachine_t*); 108 virtual ~LoopbreakDescriptor() {} 109 110 virtual void Read(); 111 virtual void Write(); 112 virtual void Heartbeat() {} 113 114 virtual bool SelectForRead() {return true;} 115 virtual bool SelectForWrite() {return false;} 116 }; 96 117 97 118 … … 103 124 { 104 125 public: 105 ConnectionDescriptor ( EventMachine_t*, int);126 ConnectionDescriptor (int, EventMachine_t*); 106 127 virtual ~ConnectionDescriptor(); 107 128 … … 155 176 int InactivityTimeout; 156 177 157 protected:158 EventMachine_t *MyEventMachine;159 160 178 private: 161 179 void _WriteOutboundData(); … … 174 192 { 175 193 public: 176 DatagramDescriptor (int );194 DatagramDescriptor (int, EventMachine_t*); 177 195 virtual ~DatagramDescriptor(); 178 196 … … 225 243 { 226 244 public: 227 AcceptorDescriptor ( EventMachine_t*, int);245 AcceptorDescriptor (int, EventMachine_t*); 228 246 virtual ~AcceptorDescriptor(); 229 247 … … 234 252 virtual bool SelectForRead() {return true;} 235 253 virtual bool SelectForWrite() {return false;} 236 237 protected:238 EventMachine_t *MyEventMachine;239 254 }; 240 255 … … 246 261 { 247 262 public: 248 PipeDescriptor (FILE* );263 PipeDescriptor (FILE*, EventMachine_t*); 249 264 virtual ~PipeDescriptor(); 250 265 version_0/ext/em.cpp
r368 r372 193 193 int EventMachine_t::SetRlimitNofile (int nofiles) 194 194 { 195 #ifdef OS_UNIX 195 196 struct rlimit rlim; 196 197 if (nofiles >= 0) { … … 203 204 getrlimit (RLIMIT_NOFILE, &rlim); 204 205 return rlim.rlim_cur; 206 #endif 207 208 #ifdef OS_WIN32 209 // No meaningful implementation on Windows. 210 return 0; 211 #endif 205 212 } 206 213 … … 292 299 cloexec |= FD_CLOEXEC; 293 300 fcntl (epfd, F_SETFD, cloexec); 301 302 assert (LoopBreakerReader >= 0); 303 LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); 304 assert (ld); 305 Add (ld); 294 306 } 295 307 #endif … … 339 351 for (int i=0; i < s; i++) { 340 352 EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr; 341 assert (ed);342 353 343 354 if (ev[i].events & (EPOLLERR | EPOLLHUP)) … … 746 757 * (To wit, the ConnectionCompleted event gets sent to the client.) 747 758 */ 748 ConnectionDescriptor *cd = new ConnectionDescriptor ( this, sd);759 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 749 760 if (!cd) 750 761 throw std::runtime_error ("no connection allocated"); … … 764 775 // Put the connection on the stack and wait for it to complete 765 776 // or time out. 766 ConnectionDescriptor *cd = new ConnectionDescriptor ( this, sd);777 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 767 778 if (!cd) 768 779 throw std::runtime_error ("no connection allocated"); … … 784 795 * for people to know that a failure occurred. 785 796 */ 786 ConnectionDescriptor *cd = new ConnectionDescriptor ( this, sd);797 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 787 798 if (!cd) 788 799 throw std::runtime_error ("no connection allocated"); … … 811 822 // Put the connection on the stack and wait for it to complete 812 823 // or time out. 813 ConnectionDescriptor *cd = new ConnectionDescriptor ( this, sd);824 ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); 814 825 if (!cd) 815 826 throw std::runtime_error ("no connection allocated"); … … 888 899 // we still set the "pending" flag, so some needed initializations take 889 900 // place. 890 ConnectionDescriptor *cd = new ConnectionDescriptor ( this, fd);901 ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this); 891 902 if (!cd) 892 903 throw std::runtime_error ("no connection allocated"); … … 980 991 981 992 { // Looking good. 982 AcceptorDescriptor *ad = new AcceptorDescriptor ( this, sd_accept);993 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); 983 994 if (!ad) 984 995 throw std::runtime_error ("unable to allocate acceptor"); … … 1041 1052 1042 1053 { // Looking good. 1043 DatagramDescriptor *ds = new DatagramDescriptor (sd );1054 DatagramDescriptor *ds = new DatagramDescriptor (sd, this); 1044 1055 if (!ds) 1045 1056 throw std::runtime_error ("unable to allocate datagram-socket"); … … 1127 1138 int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644); 1128 1139 1129 FileStreamDescriptor *fsd = new FileStreamDescriptor (fd );1140 FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this); 1130 1141 if (!fsd) 1131 1142 throw std::runtime_error ("no file-stream allocated"); … … 1204 1215 1205 1216 { // Looking good. 1206 AcceptorDescriptor *ad = new AcceptorDescriptor ( this, sd_accept);1217 AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); 1207 1218 if (!ad) 1208 1219 throw std::runtime_error ("unable to allocate acceptor"); … … 1250 1261 1251 1262 { // Looking good. 1252 PipeDescriptor *pd = new PipeDescriptor (fp );1263 PipeDescriptor *pd = new PipeDescriptor (fp, this); 1253 1264 if (!pd) 1254 1265 throw std::runtime_error ("unable to allocate pipe"); version_0/ext/em.h
r366 r372 101 101 void _AddNewDescriptors(); 102 102 void _InitializeLoopBreaker(); 103 void _ReadLoopBreaker();104 103 105 104 bool _RunSelectOnce(); 106 105 bool _RunEpollOnce(); 106 107 public: 108 void _ReadLoopBreaker(); 107 109 108 110 private: version_0/ext/files.cpp
r325 r372 25 25 ******************************************/ 26 26 27 FileStreamDescriptor::FileStreamDescriptor (int fd ):28 EventableDescriptor (fd ),27 FileStreamDescriptor::FileStreamDescriptor (int fd, EventMachine_t *em): 28 EventableDescriptor (fd, em), 29 29 OutboundDataSize (0) 30 30 { version_0/ext/files.h
r325 r372 31 31 { 32 32 public: 33 FileStreamDescriptor (int );33 FileStreamDescriptor (int, EventMachine_t*); 34 34 virtual ~FileStreamDescriptor(); 35 35 version_0/ext/pipe.cpp
r337 r372 25 25 ******************************/ 26 26 27 PipeDescriptor::PipeDescriptor (FILE *fp ):28 EventableDescriptor (fileno (fp) ),27 PipeDescriptor::PipeDescriptor (FILE *fp, EventMachine_t *parent_em): 28 EventableDescriptor (fileno (fp), parent_em), 29 29 bReadAttemptedAfterClose (false), 30 30 LastIo (gCurrentLoopTime), version_0/tests/test_epoll.rb
r371 r372 53 53 end 54 54 def receive_data data 55 raise "bad response" unless data == "ABCDE" 55 56 end 56 57 def unbind … … 74 75 # up past 512. (Each connection uses two sockets, a client and a server.) 75 76 # (Will require running the test as root) 77 # This test exercises TCP clients and servers. 76 78 # 77 79 def test_descriptors … … 82 84 $n = 0 83 85 $max = 0 84 400.times {86 100.times { 85 87 EM.connect("127.0.0.1", 9800, TestEchoClient) {$n += 1} 86 88 } 87 89 } 88 90 assert_equal(0, $n) 89 assert_equal( 400, $max)91 assert_equal(100, $max) 90 92 end 93 94 def test_defer 95 $n = 0 96 EM.epoll 97 EM.run { 98 sleep_proc = proc {sleep 1} 99 return_proc = proc {$n += 1; EM.stop} 100 EM.defer sleep_proc, return_proc 101 } 102 assert_equal( 1, $n ) 103 end 104 91 105 end
