Changeset 472
- Timestamp:
- 07/22/07 10:14:20 (1 year ago)
- Files:
-
- version_0/lib/em/streamer.rb (modified) (1 diff)
- version_0/lib/eventmachine.rb (modified) (2 diffs)
- version_0/tests/test_send_file.rb (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
version_0/lib/em/streamer.rb
r470 r472 24 24 # 25 25 26 27 module EventMachine 28 class FileStreamer 29 MappingThreshold = 8192 30 BackpressureLevel = 50000 31 ChunkSize = 4096 32 33 include Deferrable 34 def initialize connection, filename, args 35 @connection = connection 36 @http_chunks = args[:http_chunks] 37 38 if File.exist?(filename) 39 @size = File.size?(filename) 40 if @size <= MappingThreshold 41 stream_without_mapping filename 42 else 43 stream_with_mapping filename 44 end 45 else 46 fail "file not found" 47 end 48 end 49 50 def stream_without_mapping filename 51 if @http_chunks 52 @connection.send_data "#{format("%x",@size)}\r\n" 53 @connection.send_file_data filename 54 @connection.send_data "\r\n0\r\n\r\n" 55 else 56 @connection.send_file_data filename 57 end 58 succeed 59 end 60 private :stream_without_mapping 61 62 def stream_with_mapping filename 63 ensure_mapping_extension_is_present 64 65 @position = 0 66 @mapping = EventMachine::FastFileReader::Mapper.new filename 67 stream_one_chunk 68 end 69 private :stream_with_mapping 70 71 def stream_one_chunk 72 loop { 73 if @position < @size 74 if @connection.get_outbound_data_size > BackpressureLevel 75 EventMachine::next_tick {stream_one_chunk} 76 else 77 len = @size - @position 78 len = ChunkSize if (len > ChunkSize) 79 80 @connection.send_data( "#{format("%x",len)}\r\n" ) if @http_chunks 81 @connection.send_data( @mapping.get_chunk( @position, len )) 82 @connection.send_data("\r\n") if @http_chunks 83 84 @position += len 85 end 86 else 87 @connection.send_data "0\r\n\r\n" if @http_chunks 88 @mapping.close 89 succeed 90 break 91 end 92 } 93 end 94 95 #-- 96 # We use an outboard extension class to get memory-mapped files. 97 # It's outboard to avoid polluting the core distro, but that means 98 # there's a "hidden" dependency on it. The first time we get here in 99 # any run, try to load up the dependency extension. User code will see 100 # a LoadError if it's not available, but code that doesn't require 101 # mapped files will work fine without it. This is a somewhat difficult 102 # compromise between usability and proper modularization. 103 # 104 def ensure_mapping_extension_is_present 105 @@fastfilereader ||= (require 'fastfilereaderext') 106 end 107 private :ensure_mapping_extension_is_present 108 109 end 110 end 111 version_0/lib/eventmachine.rb
r433 r472 66 66 require 'em/eventable' 67 67 require 'em/messages' 68 require 'em/streamer' 68 69 69 70 require 'shellwords' … … 1253 1254 end 1254 1255 1256 # Open a file on the filesystem and send it to the remote peer. This returns an 1257 # object of type EventMachine::Deferrable. The object's callbacks will be executed 1258 # on the reactor main thread when the file has been completely scheduled for 1259 # transmission to the remote peer. Its errbacks will be called in case of an error 1260 # (such as file-not-found). #stream_file_data employs various strategems to achieve 1261 # the fastest possible performance, balanced against minimum consumption of memory. 1262 # 1263 # You can control the behavior of #stream_file_data with the optional arguments parameter. 1264 # Currently-supported arguments are: 1265 # :http_chunks, a boolean flag which defaults false. If true, this flag streams the 1266 # file data in a format compatible with the HTTP chunked-transfer encoding. 1267 # 1268 # 1269 def stream_file_data filename, args={} 1270 EventMachine::FileStreamer.new( self, filename, args ) 1271 end 1255 1272 1256 1273 version_0/tests/test_send_file.rb
r413 r472 45 45 46 46 def teardown 47 File.unlink( TestFilename ) if File.exist?( TestFilename ) 47 48 end 48 49 … … 69 70 end 70 71 72 def test_send_large_file 73 File.open( TestFilename, "w" ) {|f| 74 f << ("A" * 1000000) 75 } 76 77 data = nil 78 79 EM.run { 80 EM.start_server TestHost, TestPort, TestModule 81 EM.add_timer(2) {EM.stop} # avoid hanging in case of error 82 EM.defer proc { 83 t = TCPSocket.new TestHost, TestPort 84 data = t.read 85 }, proc { 86 EM.stop 87 } 88 } 89 90 assert_equal( "A" * 1000000, data ) 91 File.unlink TestFilename 92 end 93 94 95 module StreamTestModule 96 def post_init 97 EM::Deferrable.future( stream_file_data(TestFilename)) { 98 close_connection_after_writing 99 } 100 end 101 end 102 103 module ChunkStreamTestModule 104 def post_init 105 EM::Deferrable.future( stream_file_data(TestFilename, :http_chunks=>true)) { 106 close_connection_after_writing 107 } 108 end 109 end 110 111 def test_stream_file_data 112 File.open( TestFilename, "w" ) {|f| 113 f << ("A" * 1000) 114 } 115 116 data = nil 117 118 EM.run { 119 EM.start_server TestHost, TestPort, StreamTestModule 120 EM.add_timer(2) {EM.stop} # avoid hanging in case of error 121 EM.defer proc { 122 t = TCPSocket.new TestHost, TestPort 123 data = t.read 124 }, proc { 125 EM.stop 126 } 127 } 128 129 assert_equal( "A" * 1000, data ) 130 131 File.unlink TestFilename 132 end 133 134 def test_stream_chunked_file_data 135 File.open( TestFilename, "w" ) {|f| 136 f << ("A" * 1000) 137 } 138 139 data = nil 140 141 EM.run { 142 EM.start_server TestHost, TestPort, ChunkStreamTestModule 143 EM.add_timer(2) {EM.stop} # avoid hanging in case of error 144 EM.defer proc { 145 t = TCPSocket.new TestHost, TestPort 146 data = t.read 147 }, proc { 148 EM.stop 149 } 150 } 151 152 assert_equal( "3e8\r\n#{"A" * 1000}\r\n0\r\n\r\n", data ) 153 154 File.unlink TestFilename 155 end 156 157 module BadFileTestModule 158 def post_init 159 de = stream_file_data( TestFilename+"..." ) 160 de.errback {|msg| 161 send_data msg 162 close_connection_after_writing 163 } 164 end 165 end 166 def test_stream_bad_file 167 data = nil 168 EM.run { 169 EM.start_server TestHost, TestPort, BadFileTestModule 170 EM.add_timer(2) {EM.stop} # avoid hanging in case of error 171 EM.defer proc { 172 t = TCPSocket.new TestHost, TestPort 173 data = t.read 174 }, proc { 175 EM.stop 176 } 177 } 178 179 assert_equal( "file not found", data ) 180 end 181 182 def test_stream_large_file_data 183 File.open( TestFilename, "w" ) {|f| 184 f << ("A" * 10000) 185 } 186 187 data = nil 188 189 EM.run { 190 EM.start_server TestHost, TestPort, StreamTestModule 191 EM.add_timer(2) {EM.stop} # avoid hanging in case of error 192 EM.defer proc { 193 t = TCPSocket.new TestHost, TestPort 194 data = t.read 195 }, proc { 196 EM.stop 197 } 198 } 199 200 assert_equal( "A" * 10000, data ) 201 202 File.unlink TestFilename 203 end 204 205 def test_stream_large_chunked_file_data 206 File.open( TestFilename, "w" ) {|f| 207 f << ("A" * 10000) 208 } 209 210 data = nil 211 212 EM.run { 213 EM.start_server TestHost, TestPort, ChunkStreamTestModule 214 EM.add_timer(2) {EM.stop} # avoid hanging in case of error 215 EM.defer proc { 216 t = TCPSocket.new TestHost, TestPort 217 data = t.read 218 }, proc { 219 EM.stop 220 } 221 } 222 223 expected = [ 224 "1000\r\n", 225 "A" * 4096, 226 "\r\n", 227 "1000\r\n", 228 "A" * 4096, 229 "\r\n", 230 "710\r\n", 231 "A" * 0x710, 232 "\r\n", 233 "0\r\n", 234 "\r\n" 235 ].join 236 assert_equal( expected, data ) 237 238 File.unlink TestFilename 239 end 240 71 241 end 72 242
