| | 599 | |
|---|
| | 600 | |
|---|
| | 601 | |
|---|
| | 602 | |
|---|
| | 603 | module DeferredTrigger # :nodoc: |
|---|
| | 604 | def receive_data str |
|---|
| | 605 | EventMachine.run_deferred_callbacks |
|---|
| | 606 | end |
|---|
| | 607 | end |
|---|
| | 608 | |
|---|
| | 609 | |
|---|
| | 610 | def self::run_deferred_callbacks # :nodoc: |
|---|
| | 611 | until @resultqueue.empty? |
|---|
| | 612 | result,cback = @resultqueue.pop |
|---|
| | 613 | cback.call result if cback |
|---|
| | 614 | end |
|---|
| | 615 | end |
|---|
| | 616 | |
|---|
| | 617 | # #defer is for integrating blocking operations into EventMachine's control flow. |
|---|
| | 618 | # Call #defer with one or two blocks, as shown below (the second block is <i>optional</i>): |
|---|
| | 619 | # |
|---|
| | 620 | # operation = proc { |
|---|
| | 621 | # # perform a long-running operation here, such as a database query. |
|---|
| | 622 | # "result" # as usual, the last expression evaluated in the block will be the return value. |
|---|
| | 623 | # } |
|---|
| | 624 | # callback = proc {|result| |
|---|
| | 625 | # # do something with result here, such as send it back to a network client. |
|---|
| | 626 | # } |
|---|
| | 627 | # |
|---|
| | 628 | # EventMachine.defer( operation, callback ) |
|---|
| | 629 | # |
|---|
| | 630 | # The action of #defer is to take the block specified in the first parameter (the "operation") |
|---|
| | 631 | # and schedule it for asynchronous execution on an internal thread pool maintained by EventMachine. |
|---|
| | 632 | # When the operation completes, it will pass the result computed by the block (if any) |
|---|
| | 633 | # back to the EventMachine reactor. Then, EventMachine calls the block specified in the |
|---|
| | 634 | # second parameter to #defer (the "callback"), as part of its normal, synchronous |
|---|
| | 635 | # event handling loop. The result computed by the operation block is passed as a parameter |
|---|
| | 636 | # to the callback. You may omit the callback parameter if you don't need to execute any code |
|---|
| | 637 | # after the operation completes. |
|---|
| | 638 | # |
|---|
| | 639 | # <i>Caveats:</i> |
|---|
| | 640 | # This is a <b>provisional</b> implementation and is subject to change. |
|---|
| | 641 | # Note carefully that the code in your deferred operation will be executed on a separate |
|---|
| | 642 | # thread from the main EventMachine processing and all other Ruby threads that may exist in |
|---|
| | 643 | # your program. Also, multiple deferred operations may be running at once! Therefore, you |
|---|
| | 644 | # are responsible for ensuring that your operation code is threadsafe. [Need more explanation |
|---|
| | 645 | # and examples.] |
|---|
| | 646 | # Don't write a deferred operation that will block forever. If so, the current implementation will |
|---|
| | 647 | # not detect the problem, and the thread will never be returned to the pool. EventMachine limits |
|---|
| | 648 | # the number of threads in its pool, so if you do this enough times, your subsequent deferred |
|---|
| | 649 | # operations won't get a chance to run. [We might put in a timer to detect this problem.] |
|---|
| | 650 | # |
|---|
| | 651 | def self::defer op, callback = nil |
|---|
| | 652 | unless @threadqueue |
|---|
| | 653 | |
|---|
| | 654 | start_server "127.0.0.1", 29999, DeferredTrigger |
|---|
| | 655 | @deferred_trigger = connect "127.0.0.1", 29999 |
|---|
| | 656 | |
|---|
| | 657 | require 'thread' |
|---|
| | 658 | @threadqueue = Queue.new |
|---|
| | 659 | @resultqueue = Queue.new |
|---|
| | 660 | 20.times {|ix| |
|---|
| | 661 | Thread.new { |
|---|
| | 662 | my_ix = ix |
|---|
| | 663 | loop { |
|---|
| | 664 | op,cback = @threadqueue.pop |
|---|
| | 665 | result = op.call |
|---|
| | 666 | @resultqueue << [result, cback] |
|---|
| | 667 | @deferred_trigger.send_data "." |
|---|
| | 668 | } |
|---|
| | 669 | } |
|---|
| | 670 | } |
|---|
| | 671 | end |
|---|
| | 672 | |
|---|
| | 673 | @threadqueue << [op,callback] |
|---|
| | 674 | end |
|---|
| | 675 | |
|---|