Changeset 464
- Timestamp:
- 07/21/07 20:42:43 (1 year ago)
- Files:
-
- version_0/java/src/com/rubyeventmachine/Application.java (modified) (2 diffs)
- version_0/java/src/com/rubyeventmachine/Connection.java (modified) (5 diffs)
- version_0/java/src/com/rubyeventmachine/EmReactor.java (modified) (2 diffs)
- version_0/java/src/com/rubyeventmachine/EventableSocketChannel.java (modified) (10 diffs)
- version_0/java/src/com/rubyeventmachine/tests/ConnectTest.java (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
version_0/java/src/com/rubyeventmachine/Application.java
r451 r464 7 7 import java.nio.channels.*; 8 8 import java.util.*; 9 import java.io.*; 10 import java.net.SocketAddress; 9 11 10 12 /** … … 12 14 * 13 15 */ 14 public class Application extends EmReactor { 15 16 private TreeMap<String, Timer> timers; 17 private TreeMap<String, Connection> connections; 18 /** 19 * 20 */ 21 public Application() { 22 timers = new TreeMap<String, Timer>(); 23 connections = new TreeMap<String, Connection>(); 24 } 16 public class Application { 25 17 26 18 27 public void eventCallback (String sig, int eventType, ByteBuffer data) { 28 if (eventType == EM_TIMER_FIRED) { 29 String timersig = new String (data.array()); 30 //System.out.println ("EVSIG "+sig + "..."+new String(data.array())); 31 Timer r = timers.remove(timersig); 32 if (r != null) 33 r._fire(); 34 else 35 throw new RuntimeException ("unable to run unknown timer"); 19 public class Reactor extends EmReactor { 20 21 private Application application; 22 private TreeMap<String, Timer> timers; 23 private TreeMap<String, Connection> connections; 24 private TreeMap<String, ConnectionFactory> acceptors; 25 /** 26 * 27 */ 28 public Reactor (Application app) { 29 application = app; 30 timers = new TreeMap<String, Timer>(); 31 connections = new TreeMap<String, Connection>(); 32 acceptors = new TreeMap<String, ConnectionFactory>(); 36 33 } 37 else if (eventType == EM_CONNECTION_COMPLETED) { 38 Connection c = connections.get(sig); 39 if (c != null) { 40 c.connectionCompleted(); 34 35 36 public void eventCallback (String sig, int eventType, ByteBuffer data) { 37 if (eventType == EM_TIMER_FIRED) { 38 String timersig = new String (data.array()); 39 //System.out.println ("EVSIG "+sig + "..."+new String(data.array())); 40 Timer r = timers.remove(timersig); 41 if (r != null) 42 r._fire(); 43 else 44 throw new RuntimeException ("unable to run unknown timer"); 41 45 } 42 else 43 throw new RuntimeException ("connection completed to unknown object"); 46 else if (eventType == EM_CONNECTION_COMPLETED) { 47 Connection c = connections.get(sig); 48 if (c != null) { 49 c.connectionCompleted(); 50 } 51 else 52 throw new RuntimeException ("connection completed to unknown object"); 44 53 45 }46 else if (eventType == EM_CONNECTION_UNBOUND) {47 Connection c = connections.get(sig);48 if (c != null) {49 c.unbind();50 54 } 51 else 52 throw new RuntimeException ("unbind received on unknown object");53 }54 else if (eventType == EM_CONNECTION_READ) {55 Connection c = connections.get(sig);56 if (c != null) {57 c.receiveData(data);55 else if (eventType == EM_CONNECTION_UNBOUND) { 56 Connection c = connections.get(sig); 57 if (c != null) { 58 c.unbind(); 59 } 60 else 61 throw new RuntimeException ("unbind received on unknown object"); 58 62 } 59 else throw new RuntimeException ("received data on unknown object"); 60 } 61 else { 62 System.out.println ("unknown event type: " + eventType); 63 else if (eventType == EM_CONNECTION_ACCEPTED) { 64 ConnectionFactory f = acceptors.get(sig); 65 if (f != null) { 66 Connection c = f.connection(); 67 c.signature = new String (data.array()); 68 c.application = application; 69 connections.put(c.signature, c); 70 c.postInit(); 71 //System.out.println (sig+"..."+new String(data.array())); 72 } 73 else 74 throw new RuntimeException ("received connection on unknown acceptor"); 75 } 76 else if (eventType == EM_CONNECTION_READ) { 77 Connection c = connections.get(sig); 78 if (c != null) { 79 c.receiveData(data); 80 } 81 else throw new RuntimeException ("received data on unknown object"); 82 } 83 else { 84 System.out.println ("unknown event type: " + eventType); 85 } 63 86 } 64 87 } 88 89 90 Reactor reactor; 65 91 66 67 92 public Application() { 93 reactor = new Reactor (this); 94 } 68 95 public void addTimer (double seconds, Timer t) { 69 96 t.application = this; 70 97 t.interval = seconds; 71 String s = installOneshotTimer ((int)(seconds * 1000));72 timers.put(s, t);98 String s = reactor.installOneshotTimer ((int)(seconds * 1000)); 99 reactor.timers.put(s, t); 73 100 74 101 } 75 102 76 public void connect (String host, int port, Connection c) throws ClosedChannelException{ 77 String s = connectTcpServer(host, port); 78 c.application = this; 79 c.signature = s; 80 connections.put(s, c); 103 public void connect (String host, int port, Connection c) { 104 try { 105 String s = reactor.connectTcpServer(host, port); 106 c.application = this; 107 c.signature = s; 108 reactor.connections.put(s, c); 109 c.postInit(); 110 } catch (ClosedChannelException e) {} 111 } 112 113 public void startServer (SocketAddress sa, ConnectionFactory f) throws EmReactorException { 114 String s = reactor.startTcpServer(sa); 115 reactor.acceptors.put(s, f); 116 } 117 118 public void stop() { 119 reactor.stop(); 120 } 121 public void run() { 122 try { 123 reactor.run(); 124 } catch (IOException e) {} 125 } 126 public void run (final Runnable r) { 127 addTimer(0, new Timer() { 128 public void fire() { 129 r.run(); 130 } 131 }); 132 run(); 133 } 134 135 public void sendData (String sig, ByteBuffer bb) { 136 try { 137 reactor.sendData(sig, bb); 138 } catch (IOException e) {} 139 } 140 public void closeConnection (String sig, boolean afterWriting) { 141 try { 142 reactor.closeConnection(sig, afterWriting); 143 } catch (ClosedChannelException e) {} 81 144 } 82 145 } version_0/java/src/com/rubyeventmachine/Connection.java
r451 r464 1 1 package com.rubyeventmachine; 2 2 3 import java.io.*;3 //import java.io.*; 4 4 import java.nio.*; 5 import java.nio.channels.*;5 //import java.nio.channels.*; 6 6 7 7 public class Connection { … … 10 10 public String signature; 11 11 12 public void postInit() {} 12 13 public void connectionCompleted() {} 13 14 public void unbind() {} … … 19 20 * @param bytebuffer 20 21 */ 21 public void sendData (ByteBuffer b) throws IOException{22 public void sendData (ByteBuffer b) { 22 23 application.sendData(signature, b); 23 24 } … … 27 28 * TODO: don't expose the exception here. 28 29 */ 29 public void close() throws ClosedChannelException{30 public void close() { 30 31 application.closeConnection(signature, false); 31 32 } … … 33 34 * This is called by user code/ 34 35 */ 35 public void closeAfterWriting() throws ClosedChannelException{36 public void closeAfterWriting() { 36 37 application.closeConnection(signature, true); 37 38 } version_0/java/src/com/rubyeventmachine/EmReactor.java
r460 r464 248 248 } 249 249 250 public String startTcpServer (String address, int port) throws IOException { 250 public String startTcpServer (SocketAddress sa) throws EmReactorException { 251 try { 252 ServerSocketChannel server = ServerSocketChannel.open(); 253 server.configureBlocking(false); 254 server.socket().bind (sa); 255 String s = createBinding(); 256 Acceptors.put(s, server); 257 server.register(mySelector, SelectionKey.OP_ACCEPT, s); 258 return s; 259 } catch (IOException e) { 260 // TODO, should parameterize this exception better. 261 throw new EmReactorException ("unable to open socket acceptor"); 262 } 263 } 264 265 public String startTcpServer (String address, int port) throws EmReactorException { 266 return startTcpServer (new InetSocketAddress (address, port)); 267 /* 251 268 ServerSocketChannel server = ServerSocketChannel.open(); 252 269 server.configureBlocking(false); … … 256 273 server.register(mySelector, SelectionKey.OP_ACCEPT, s); 257 274 return s; 275 */ 258 276 } 259 277 version_0/java/src/com/rubyeventmachine/EventableSocketChannel.java
r463 r464 21 21 22 22 // TODO, must refactor this to permit channels that aren't sockets. 23 SocketChannel myChannel;24 String myBinding;25 Selector mySelector;26 LinkedList<ByteBuffer> OutboundQ;23 SocketChannel channel; 24 String binding; 25 Selector selector; 26 LinkedList<ByteBuffer> outboundQ; 27 27 boolean bCloseScheduled; 28 28 … … 33 33 34 34 35 public EventableSocketChannel (SocketChannel sc, String binding, Selector sel) throws ClosedChannelException {36 myChannel = sc;37 myBinding =binding;38 mySelector = sel;35 public EventableSocketChannel (SocketChannel sc, String _binding, Selector sel) throws ClosedChannelException { 36 channel = sc; 37 binding = _binding; 38 selector = sel; 39 39 bCloseScheduled = false; 40 OutboundQ = new LinkedList<ByteBuffer>();40 outboundQ = new LinkedList<ByteBuffer>(); 41 41 42 sc.register( mySelector, SelectionKey.OP_READ, this);42 sc.register(selector, SelectionKey.OP_READ, this); 43 43 } 44 44 45 45 public String getBinding() { 46 return myBinding;46 return binding; 47 47 } 48 48 … … 53 53 public void close() { 54 54 try { 55 myChannel.close();55 channel.close(); 56 56 } catch (IOException e) { 57 57 } … … 65 65 sslEngine.wrap(bb, b); 66 66 b.flip(); 67 OutboundQ.addLast(b);67 outboundQ.addLast(b); 68 68 } 69 69 else { 70 OutboundQ.addLast(bb);70 outboundQ.addLast(bb); 71 71 } 72 myChannel.register(mySelector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this);72 channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this); 73 73 } 74 74 } catch (ClosedChannelException e) { … … 88 88 public void readInboundData (ByteBuffer bb) { 89 89 try { 90 myChannel.read(bb);90 channel.read(bb); 91 91 } catch (IOException e) { 92 92 throw new RuntimeException ("i/o error"); … … 107 107 */ 108 108 public boolean writeOutboundData(){ 109 while (! OutboundQ.isEmpty()) {110 ByteBuffer b = OutboundQ.getFirst();109 while (!outboundQ.isEmpty()) { 110 ByteBuffer b = outboundQ.getFirst(); 111 111 try { 112 112 if (b.remaining() > 0) 113 myChannel.write(b);113 channel.write(b); 114 114 } 115 115 catch (IOException e) { … … 121 121 // buffers are full, so break out of here. 122 122 if (b.remaining() == 0) 123 OutboundQ.removeFirst();123 outboundQ.removeFirst(); 124 124 else 125 125 break; 126 126 } 127 127 128 if ( OutboundQ.isEmpty()) {128 if (outboundQ.isEmpty()) { 129 129 try { 130 myChannel.register(mySelector, SelectionKey.OP_READ, this);130 channel.register(selector, SelectionKey.OP_READ, this); 131 131 } catch (ClosedChannelException e) { 132 132 } … … 136 136 // If anyone wants to close immediately, they're responsible for clearing 137 137 // the outbound queue. 138 return (bCloseScheduled && OutboundQ.isEmpty()) ? false : true;138 return (bCloseScheduled && outboundQ.isEmpty()) ? false : true; 139 139 } 140 140 141 141 public void setConnectPending() throws ClosedChannelException { 142 myChannel.register(mySelector, SelectionKey.OP_CONNECT, this);142 channel.register(selector, SelectionKey.OP_CONNECT, this); 143 143 } 144 144 … … 150 150 public boolean finishConnecting() throws ClosedChannelException { 151 151 try { 152 myChannel.finishConnect();152 channel.finishConnect(); 153 153 } 154 154 catch (IOException e) { 155 155 return false; 156 156 } 157 myChannel.register(mySelector, SelectionKey.OP_READ, this);157 channel.register(selector, SelectionKey.OP_READ, this); 158 158 return true; 159 159 } … … 161 161 public void scheduleClose (boolean afterWriting) { 162 162 if (!afterWriting) 163 OutboundQ.clear();164 try { 165 myChannel.register(mySelector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, this);163 outboundQ.clear(); 164 try { 165 channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, this); 166 166 } catch (ClosedChannelException e) { 167 167 throw new RuntimeException ("unable to schedule close"); // TODO, get rid of this. version_0/java/src/com/rubyeventmachine/tests/ConnectTest.java
r452 r464 39 39 a.addTimer(0, new Timer() { 40 40 public void fire() { 41 try { 42 application.connect("www.bayshorenetworks.com", 80, new Connection() { 43 public void connectionCompleted() { 44 try { 45 close(); 46 } catch (ClosedChannelException e) { 47 } 48 } 49 public void unbind() { 50 application.stop(); 51 } 52 }); 53 } catch (ClosedChannelException e) { 54 // TODO, must refactor this exception handler out of here. 55 } 41 application.connect("www.bayshorenetworks.com", 80, new Connection() { 42 public void connectionCompleted() { 43 close(); 44 } 45 public void unbind() { 46 application.stop(); 47 } 48 }); 56 49 } 57 50 }); … … 63 56 class Bays extends Connection { 64 57 public void connectionCompleted() { 65 try { 66 sendData (ByteBuffer.wrap( new String ("GET / HTTP/1.1\r\nHost: _\r\n\r\n").getBytes())); 67 } catch (IOException e) { 68 } 58 sendData (ByteBuffer.wrap( new String ("GET / HTTP/1.1\r\nHost: _\r\n\r\n").getBytes())); 69 59 } 70 60 public void receiveData (ByteBuffer b) { … … 77 67 a.addTimer(0, new Timer() { 78 68 public void fire() { 79 try { 80 application.connect("www.bayshorenetworks.com", 80, new Bays()); 81 } catch (ClosedChannelException e) { 82 } 69 application.connect("www.bayshorenetworks.com", 80, new Bays()); 83 70 } 84 71 }); 85 72 a.run(); 86 73 } 74 75 76 77 class C1 extends Connection { 78 Application application; 79 public C1 (Application a) { 80 application = a; 81 } 82 public void postInit() { 83 application.stop(); 84 } 85 } 86 @Test 87 public final void test3() { 88 final Application a = new Application(); 89 C1 c = new C1 (a); 90 a.run (new Runnable() { 91 public void run() { 92 a.connect("www.bayshorenetworks.com", 80, new C1(a)); 93 } 94 }); 95 } 96 97 98 87 99 }
