| 28 | | EventableDescriptor (fileno (fp)) |
|---|
| 29 | | { |
|---|
| 30 | | } |
|---|
| | 28 | EventableDescriptor (fileno (fp)), |
|---|
| | 29 | bReadAttemptedAfterClose (false), |
|---|
| | 30 | LastIo (gCurrentLoopTime), |
|---|
| | 31 | InactivityTimeout (0), |
|---|
| | 32 | MyStream (fp), |
|---|
| | 33 | OutboundDataSize (0) |
|---|
| | 34 | { |
|---|
| | 35 | } |
|---|
| | 36 | |
|---|
| | 37 | /******************************* |
|---|
| | 38 | PipeDescriptor::~PipeDescriptor |
|---|
| | 39 | *******************************/ |
|---|
| | 40 | |
|---|
| | 41 | PipeDescriptor::~PipeDescriptor() |
|---|
| | 42 | { |
|---|
| | 43 | // Run down any stranded outbound data. |
|---|
| | 44 | for (size_t i=0; i < OutboundPages.size(); i++) |
|---|
| | 45 | OutboundPages[i].Free(); |
|---|
| | 46 | |
|---|
| | 47 | /* As a virtual destructor, we come here before the base-class |
|---|
| | 48 | * destructor that closes our file-descriptor. Calling pclose |
|---|
| | 49 | * doesn't seem to bother the base-class destructor any, and it's |
|---|
| | 50 | * required for cleaning up the subprocess zombie. |
|---|
| | 51 | * Eventually we may need to refactor some of this stuff if there |
|---|
| | 52 | * are undesirable interactions. |
|---|
| | 53 | * Note that calling pclose on a still-running subprocess called |
|---|
| | 54 | * with mode "r" will often cause the subprocess to catch SIGPIPE. |
|---|
| | 55 | * This is part of the behavior of popen and not something EM is doing. |
|---|
| | 56 | * |
|---|
| | 57 | * Something weirder and worse happens with mode "w" - |
|---|
| | 58 | * pclose WILL HANG irretrievably if the subprocess doesn't |
|---|
| | 59 | * close when we tell it to. It will see a close on its end |
|---|
| | 60 | * of the pipe (the read end), but that doesn't mean it will |
|---|
| | 61 | * be written so as to exit when that happens. |
|---|
| | 62 | * pclose waits for the subprocess to terminate. |
|---|
| | 63 | * (Is there a flavor of popen that has a timeout or a no-hang |
|---|
| | 64 | * option?) |
|---|
| | 65 | * |
|---|
| | 66 | * pclose returns the termination status of the subprocess. |
|---|
| | 67 | * Someday we may need to make that available to the caller, |
|---|
| | 68 | * possibly as an argument to the UNBOUND event. |
|---|
| | 69 | */ |
|---|
| | 70 | pclose (MyStream); |
|---|
| | 71 | } |
|---|
| | 72 | |
|---|
| | 73 | |
|---|
| | 74 | |
|---|
| | 75 | /******************** |
|---|
| | 76 | PipeDescriptor::Read |
|---|
| | 77 | ********************/ |
|---|
| | 78 | |
|---|
| | 79 | void PipeDescriptor::Read() |
|---|
| | 80 | { |
|---|
| | 81 | int sd = GetSocket(); |
|---|
| | 82 | if (sd == INVALID_SOCKET) { |
|---|
| | 83 | assert (!bReadAttemptedAfterClose); |
|---|
| | 84 | bReadAttemptedAfterClose = true; |
|---|
| | 85 | return; |
|---|
| | 86 | } |
|---|
| | 87 | |
|---|
| | 88 | LastIo = gCurrentLoopTime; |
|---|
| | 89 | |
|---|
| | 90 | int total_bytes_read = 0; |
|---|
| | 91 | char readbuffer [16 * 1024]; |
|---|
| | 92 | |
|---|
| | 93 | for (int i=0; i < 10; i++) { |
|---|
| | 94 | // Don't read just one buffer and then move on. This is faster |
|---|
| | 95 | // if there is a lot of incoming. |
|---|
| | 96 | // But don't read indefinitely. Give other sockets a chance to run. |
|---|
| | 97 | // NOTICE, we're reading one less than the buffer size. |
|---|
| | 98 | // That's so we can put a guard byte at the end of what we send |
|---|
| | 99 | // to user code. |
|---|
| | 100 | // Use read instead of recv, which on Linux gives a "socket operation |
|---|
| | 101 | // on nonsocket" error. |
|---|
| | 102 | |
|---|
| | 103 | |
|---|
| | 104 | int r = read (sd, readbuffer, sizeof(readbuffer) - 1); |
|---|
| | 105 | //cerr << "<R:" << r << ">"; |
|---|
| | 106 | |
|---|
| | 107 | if (r > 0) { |
|---|
| | 108 | total_bytes_read += r; |
|---|
| | 109 | LastRead = gCurrentLoopTime; |
|---|
| | 110 | |
|---|
| | 111 | // Add a null-terminator at the the end of the buffer |
|---|
| | 112 | // that we will send to the callback. |
|---|
| | 113 | // DO NOT EVER CHANGE THIS. We want to explicitly allow users |
|---|
| | 114 | // to be able to depend on this behavior, so they will have |
|---|
| | 115 | // the option to do some things faster. Additionally it's |
|---|
| | 116 | // a security guard against buffer overflows. |
|---|
| | 117 | readbuffer [r] = 0; |
|---|
| | 118 | if (EventCallback) |
|---|
| | 119 | (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); |
|---|
| | 120 | } |
|---|
| | 121 | else if (r == 0) { |
|---|
| | 122 | break; |
|---|
| | 123 | } |
|---|
| | 124 | else { |
|---|
| | 125 | // Basically a would-block, meaning we've read everything there is to read. |
|---|
| | 126 | break; |
|---|
| | 127 | } |
|---|
| | 128 | |
|---|
| | 129 | } |
|---|
| | 130 | |
|---|
| | 131 | |
|---|
| | 132 | if (total_bytes_read == 0) { |
|---|
| | 133 | // If we read no data on a socket that selected readable, |
|---|
| | 134 | // it generally means the other end closed the connection gracefully. |
|---|
| | 135 | bCloseNow = true; |
|---|
| | 136 | } |
|---|
| | 137 | |
|---|
| | 138 | } |
|---|
| | 139 | |
|---|
| | 140 | /********************* |
|---|
| | 141 | PipeDescriptor::Write |
|---|
| | 142 | *********************/ |
|---|
| | 143 | |
|---|
| | 144 | void PipeDescriptor::Write() |
|---|
| | 145 | { |
|---|
| | 146 | int sd = GetSocket(); |
|---|
| | 147 | assert (sd != INVALID_SOCKET); |
|---|
| | 148 | |
|---|
| | 149 | LastIo = gCurrentLoopTime; |
|---|
| | 150 | char output_buffer [16 * 1024]; |
|---|
| | 151 | size_t nbytes = 0; |
|---|
| | 152 | |
|---|
| | 153 | while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { |
|---|
| | 154 | OutboundPage *op = &(OutboundPages[0]); |
|---|
| | 155 | if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { |
|---|
| | 156 | memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); |
|---|
| | 157 | nbytes += (op->Length - op->Offset); |
|---|
| | 158 | op->Free(); |
|---|
| | 159 | OutboundPages.pop_front(); |
|---|
| | 160 | } |
|---|
| | 161 | else { |
|---|
| | 162 | int len = sizeof(output_buffer) - nbytes; |
|---|
| | 163 | memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); |
|---|
| | 164 | op->Offset += len; |
|---|
| | 165 | nbytes += len; |
|---|
| | 166 | } |
|---|
| | 167 | } |
|---|
| | 168 | |
|---|
| | 169 | // We should never have gotten here if there were no data to write, |
|---|
| | 170 | // so assert that as a sanity check. |
|---|
| | 171 | // Don't bother to make sure nbytes is less than output_buffer because |
|---|
| | 172 | // if it were we probably would have crashed already. |
|---|
| | 173 | assert (nbytes > 0); |
|---|
| | 174 | |
|---|
| | 175 | assert (GetSocket() != INVALID_SOCKET); |
|---|
| | 176 | int bytes_written = write (GetSocket(), output_buffer, nbytes); |
|---|
| | 177 | |
|---|
| | 178 | if (bytes_written > 0) { |
|---|
| | 179 | OutboundDataSize -= bytes_written; |
|---|
| | 180 | if ((size_t)bytes_written < nbytes) { |
|---|
| | 181 | int len = nbytes - bytes_written; |
|---|
| | 182 | char *buffer = (char*) malloc (len + 1); |
|---|
| | 183 | if (!buffer) |
|---|
| | 184 | throw std::runtime_error ("bad alloc throwing back data"); |
|---|
| | 185 | memcpy (buffer, output_buffer + bytes_written, len); |
|---|
| | 186 | buffer [len] = 0; |
|---|
| | 187 | OutboundPages.push_front (OutboundPage (buffer, len)); |
|---|
| | 188 | } |
|---|
| | 189 | } |
|---|
| | 190 | else { |
|---|
| | 191 | #ifdef OS_UNIX |
|---|
| | 192 | if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) |
|---|
| | 193 | #endif |
|---|
| | 194 | #ifdef OS_WIN32 |
|---|
| | 195 | if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) |
|---|
| | 196 | #endif |
|---|
| | 197 | Close(); |
|---|
| | 198 | } |
|---|
| | 199 | } |
|---|
| | 200 | |
|---|
| | 201 | |
|---|
| | 202 | /************************* |
|---|
| | 203 | PipeDescriptor::Heartbeat |
|---|
| | 204 | *************************/ |
|---|
| | 205 | |
|---|
| | 206 | void PipeDescriptor::Heartbeat() |
|---|
| | 207 | { |
|---|
| | 208 | // If an inactivity timeout is defined, then check for it. |
|---|
| | 209 | if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) |
|---|
| | 210 | bCloseNow = true; |
|---|
| | 211 | } |
|---|
| | 212 | |
|---|
| | 213 | |
|---|
| | 214 | /***************************** |
|---|
| | 215 | PipeDescriptor::SelectForRead |
|---|
| | 216 | *****************************/ |
|---|
| | 217 | |
|---|
| | 218 | bool PipeDescriptor::SelectForRead() |
|---|
| | 219 | { |
|---|
| | 220 | /* Pipe descriptors, being local by definition, don't have |
|---|
| | 221 | * a pending state, so this is simpler than for the |
|---|
| | 222 | * ConnectionDescriptor object. |
|---|
| | 223 | */ |
|---|
| | 224 | return true; |
|---|
| | 225 | } |
|---|
| | 226 | |
|---|
| | 227 | /****************************** |
|---|
| | 228 | PipeDescriptor::SelectForWrite |
|---|
| | 229 | ******************************/ |
|---|
| | 230 | |
|---|
| | 231 | bool PipeDescriptor::SelectForWrite() |
|---|
| | 232 | { |
|---|
| | 233 | /* Pipe descriptors, being local by definition, don't have |
|---|
| | 234 | * a pending state, so this is simpler than for the |
|---|
| | 235 | * ConnectionDescriptor object. |
|---|
| | 236 | */ |
|---|
| | 237 | return (GetOutboundDataSize() > 0); |
|---|
| | 238 | } |
|---|
| | 239 | |
|---|
| | 240 | |
|---|
| | 241 | |
|---|
| | 242 | |
|---|
| | 243 | /********************************* |
|---|
| | 244 | PipeDescriptor::SendOutboundData |
|---|
| | 245 | ********************************/ |
|---|
| | 246 | |
|---|
| | 247 | int PipeDescriptor::SendOutboundData (const char *data, int length) |
|---|
| | 248 | { |
|---|
| | 249 | if (bCloseNow || bCloseAfterWriting) |
|---|
| | 250 | return 0; |
|---|
| | 251 | |
|---|
| | 252 | if (!data && (length > 0)) |
|---|
| | 253 | throw std::runtime_error ("bad outbound data"); |
|---|
| | 254 | char *buffer = (char *) malloc (length + 1); |
|---|
| | 255 | if (!buffer) |
|---|
| | 256 | throw std::runtime_error ("no allocation for outbound data"); |
|---|
| | 257 | memcpy (buffer, data, length); |
|---|
| | 258 | buffer [length] = 0; |
|---|
| | 259 | OutboundPages.push_back (OutboundPage (buffer, length)); |
|---|
| | 260 | OutboundDataSize += length; |
|---|
| | 261 | return length; |
|---|
| | 262 | } |
|---|
| | 263 | |
|---|