Changeset 611
- Timestamp:
- 12/06/07 07:31:39 (1 year ago)
- Files:
-
- version_0/lib/pr_eventmachine.rb (modified) (13 diffs)
- version_0/tests/test_pure.rb (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
version_0/lib/pr_eventmachine.rb
r610 r611 194 194 def set_comm_inactivity_timeout sig, tm 195 195 r = Reactor.instance.get_selectable( sig ) or raise "unknown set_comm_inactivity_timeout target" 196 p "?????????????" # TODO, IMPLEMENT THIS.196 r.set_inactivity_timeout tm 197 197 end 198 198 end … … 248 248 249 249 TimerFired = 100 250 ConnectionData = 101251 ConnectionUnbound = 102252 ConnectionAccepted = 103253 ConnectionCompleted = 104254 LoopbreakSignalled = 105250 ConnectionData = 101 251 ConnectionUnbound = 102 252 ConnectionAccepted = 103 253 ConnectionCompleted = 104 254 LoopbreakSignalled = 105 255 255 256 256 end … … 261 261 class Reactor 262 262 include Singleton 263 264 HeartbeatInterval = 2 265 266 attr_reader :current_loop_time 263 267 264 268 def initialize … … 284 288 @timers = SortedSet.new # [] 285 289 set_timer_quantum(0.1) 290 @current_loop_time = Time.now 291 @next_heartbeat = @current_loop_time + HeartbeatInterval 286 292 end 287 293 … … 302 308 303 309 loop { 310 @current_loop_time = Time.now 311 304 312 break if @stop_scheduled 305 313 run_timers 306 314 break if @stop_scheduled 307 315 crank_selectables 316 break if @stop_scheduled 317 run_heartbeats 308 318 } 309 319 ensure … … 318 328 319 329 def run_timers 320 now = Time.now321 330 @timers.each {|t| 322 if t.first <= now331 if t.first <= @current_loop_time 323 332 @timers.delete t 324 333 EventMachine::event_callback "", TimerFired, t.last … … 331 340 # EventMachine::event_callback "", TimerFired, t.last 332 341 #end 342 end 343 344 def run_heartbeats 345 if @next_heartbeat <= @current_loop_time 346 @next_heartbeat = @current_loop_time + HeartbeatInterval 347 @selectables.each {|k,io| io.heartbeat} 348 end 333 349 end 334 350 … … 397 413 def_delegator :@my_selectable, :send_datagram 398 414 def_delegator :@my_selectable, :get_outbound_data_size 415 def_delegator :@my_selectable, :set_inactivity_timeout 416 def_delegator :@my_selectable, :heartbeat 399 417 end 400 418 … … 409 427 @uuid = UuidGenerator.generate 410 428 @io = io 429 @last_activity = Reactor.instance.current_loop_time 411 430 412 431 m = @io.fcntl(Fcntl::F_GETFL, 0) … … 437 456 end 438 457 458 def set_inactivity_timeout tm 459 @inactivity_timeout = tm 460 end 461 462 def heartbeat 463 end 439 464 end 440 465 … … 477 502 # If we have it, then we can read multiple times safely to improve 478 503 # performance. 504 # The last-activity clock ASSUMES that we only come here when we 505 # have selected readable. 479 506 # TODO, coalesce multiple reads into a single event. 480 507 # TODO, do the function check somewhere else and cache it. 481 508 def eventable_read 509 @last_activity = Reactor.instance.current_loop_time 482 510 begin 483 511 if io.respond_to?(:read_nonblock) … … 509 537 # connections. Also we should coalesce small writes. 510 538 # URGENT TODO: Coalesce small writes. They are a performance killer. 539 # The last-activity recorder ASSUMES we'll only come here if we've 540 # selected writable. 511 541 def eventable_write 512 542 # coalesce the outbound array here, perhaps 543 @last_activity = Reactor.instance.current_loop_time 513 544 while data = @outbound_q.shift do 514 545 begin … … 565 596 end 566 597 598 def heartbeat 599 if @inactivity_timeout and (@last_activity + @inactivity_timeout) < Reactor.instance.current_loop_time 600 schedule_close true 601 end 602 end 567 603 end 568 604 version_0/tests/test_pure.rb
r607 r611 113 113 end 114 114 115 116 117 def test_reactor_running 118 a = false 119 EM.run { 120 a = EM.reactor_running? 121 EM.next_tick {EM.stop} 122 } 123 assert a 124 end 125 126 115 127 end
