Changeset 460
- Timestamp:
- 07/21/07 09:36:27 (1 year ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
version_0/java/src/com/rubyeventmachine/EmReactor.java
r450 r460 123 123 sn.configureBlocking(false); 124 124 String b = createBinding(); 125 Eventable Channel ec = new EventableChannel (sn, b, mySelector);125 EventableSocketChannel ec = new EventableSocketChannel (sn, b, mySelector); 126 126 Connections.put(b, ec); 127 127 eventCallback ((String)k.attachment(), EM_CONNECTION_ACCEPTED, ByteBuffer.wrap(b.getBytes())); … … 130 130 131 131 if (k.isReadable()) { 132 EventableChannel ec = (EventableChannel)k.attachment(); 133 myReadBuffer.clear(); 134 ec.readInboundData (myReadBuffer); 135 myReadBuffer.flip(); 136 String b = ec.getBinding(); 137 if (myReadBuffer.limit() > 0) { 138 eventCallback (b, EM_CONNECTION_READ, myReadBuffer); 139 } 140 else { 141 eventCallback (b, EM_CONNECTION_UNBOUND, EmptyByteBuffer); 142 Connections.remove(b); 143 k.channel().close(); 144 } 145 /* 146 System.out.println ("READABLE"); 132 147 SocketChannel sn = (SocketChannel) k.channel(); 133 148 //ByteBuffer bb = ByteBuffer.allocate(16 * 1024); … … 152 167 sn.close(); 153 168 } 169 */ 154 170 } 155 171 … … 165 181 166 182 if (k.isConnectable()) { 167 Eventable Channel ec = (EventableChannel)k.attachment();183 EventableSocketChannel ec = (EventableSocketChannel)k.attachment(); 168 184 if (ec.finishConnecting()) { 169 185 eventCallback (ec.getBinding(), EM_CONNECTION_COMPLETED, EmptyByteBuffer); … … 190 206 mySelector.close(); 191 207 mySelector = null; 208 209 // run down open connections and sockets. 210 Iterator<ServerSocketChannel> i = Acceptors.values().iterator(); 211 while (i.hasNext()) { 212 i.next().close(); 213 } 214 215 Iterator<EventableChannel> i2 = Connections.values().iterator(); 216 while (i2.hasNext()) 217 i2.next().close(); 192 218 } 193 219 … … 218 244 BindingIndex++; 219 245 String s = createBinding(); 220 //System.out.println (new Date().getTime()+ milliseconds);221 246 Timers.put(new Date().getTime() + milliseconds, s); 222 247 return s; … … 241 266 } 242 267 268 269 /** 270 * 271 * @param address 272 * @param port 273 * @return 274 * @throws IOException 275 */ 276 public String openUdpSocket (String address, int port) throws IOException { 277 DatagramChannel dg = DatagramChannel.open(); 278 dg.configureBlocking(false); 279 dg.socket().bind( new InetSocketAddress(address,port)); 280 String b = createBinding(); 281 EventableChannel ec = new EventableDatagramChannel (dg, b, mySelector); 282 dg.register(mySelector, SelectionKey.OP_READ, ec); 283 Connections.put(b, ec); 284 return b; 285 } 286 243 287 public void sendData (String sig, ByteBuffer bb) throws IOException { 244 288 (Connections.get(sig)).scheduleOutboundData( bb ); … … 249 293 } 250 294 295 /** 296 * 297 * @param sig 298 * @param data 299 * @param length 300 * @param recipAddress 301 * @param recipPort 302 */ 303 public void sendDatagram (String sig, String data, int length, String recipAddress, int recipPort) { 304 sendDatagram (sig, ByteBuffer.wrap(data.getBytes()), recipAddress, recipPort); 305 } 306 307 /** 308 * 309 * @param sig 310 * @param bb 311 * @param recipAddress 312 * @param recipPort 313 */ 314 public void sendDatagram (String sig, ByteBuffer bb, String recipAddress, int recipPort) { 315 (Connections.get(sig)).scheduleOutboundDatagram( bb, recipAddress, recipPort); 316 } 317 318 319 /** 320 * 321 * @param address 322 * @param port 323 * @return 324 * @throws ClosedChannelException 325 */ 251 326 public String connectTcpServer (String address, int port) throws ClosedChannelException { 252 327 String b = createBinding(); … … 255 330 SocketChannel sc = SocketChannel.open(); 256 331 sc.configureBlocking(false); 257 Eventable Channel ec = new EventableChannel (sc, b, mySelector);332 EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector); 258 333 259 334 if (sc.connect (new InetSocketAddress (address, port))) { … … 285 360 286 361 public void closeConnection (String sig, boolean afterWriting) throws ClosedChannelException { 287 //System.out.println ("???"+Connections.get(sig));288 362 Connections.get(sig).scheduleClose (afterWriting); 289 363 }
