001 package org.LiveGraph.dataFile.read; 002 003 import java.io.IOException; 004 import java.io.InputStream; 005 006 import org.LiveGraph.dataFile.common.PipeClosedByReaderException; 007 import org.LiveGraph.dataFile.common.PipeFullException; 008 import org.LiveGraph.dataFile.common.PipeNotConnectedException; 009 import org.LiveGraph.dataFile.write.PipedOutputStream; 010 011 012 /** 013 * This class makes Java's own {@code PipedInputStream} fit for reading by multiple Threads as 014 * required for LiveGraph.<br /> 015 * <p>The thread handling built into Java's {@code java.io.PipedInputStream} gives, at the very best, 016 * reasons to hope for improvement. Sun seems to be aware of the problem: for instance, a Java API 017 * developer writes directly in the source comments of the officially distributed JDK 1.6.0 source 018 * package: "<em>[...] identification of the read and write sides needs to be more 019 * sophisticated [...]</em>;" (see the non-JavDoc comments in the source for 020 * {@code java.io.PipedInputStream}, JDK 1.6.0, lines 38-41). However, to date the problem remains.</p> 021 * <p>For LiveGraph specifically, the problem is that {@code PipedInputStream} remembers 022 * the {@code Thread} that performed the latest read operation and checks before the following 023 * receive operation from the {@code PipedOutputStream}, whether that {@code Thread} is still alive. 024 * However, LiveGraph creates a new {@code Thread} for each update in order to make sure that the 025 * application remians responsive even if the amount of new data is large, the old threads are 026 * discarded, which causes {@code PipedInputStream} to throw an exception.<br /> 027 * As a second problem, {@code PipedInputStream} causes the {@code write}-call of the 028 * {@code PipedOutputStream} to block indefinetly if the memory buffer is full. If the LiveGraph update 029 * frequency is set to low, the buffer may fill up which would cause the data producing part of the 030 * application to block. This is highly undesirable - while a short, time-limited block may be ok, 031 * an exception should be thrown if the buffer remains full for a long time to indicate to the developer 032 * that the chosen buffer size is not sufficiently large for the particular application.</p> 033 * <p>Unfortunately, the choice of scope classifiers for several methods in 034 * {@code java.io.PipedInputStream} is less than perfect. For instance, the method 035 * {@code receive(byte b[], int off, int len)} has package scope and cannot be overridden to 036 * resolve the above issues. In addition, the inapropriate use of package-visible variables such 037 * as {@code connected} instead of getter and setter methods makes overriding attampts useless. This 038 * forces LiveGraph to subclass {@code InputStream} directly to greate a better version of a piped input 039 * stream and reimplement <em>all</em> of {@code java.io.PipedInputStream}'s methods, thus unnecessarily 040 * replicating a lot of code. This may become unnecessary in future if the above problems are resolved 041 * or if LiveGraph is adapted to use the {@code java.nio} channel-based I/O instead of the 042 * traditional {@code java.io} stream-based approach (this would be a good idea anyway, if time permits 043 * to make these changes at some point in the future). For now, the source code of this class is copied 044 * from {@code java.io.PipedInputStream} that is dispributed in the source package for JDK 1.6.0 and 045 * changed where necessary.</p> 046 * 047 * <p> 048 * <strong>LiveGraph</strong> 049 * (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>). 050 * </p> 051 * <p>Copyright (c) 2007-2008 by G. Paperin.</p> 052 * <p>File: PipedInputStream.java</p> 053 * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or 054 * without modification, are permitted provided that the following terms and conditions are met: 055 * </p> 056 * <p style="font-size:smaller;">1. Redistributions of source code must retain the above 057 * acknowledgement of the LiveGraph project and its web-site, the above copyright notice, 058 * this list of conditions and the following disclaimer.<br /> 059 * 2. Redistributions in binary form must reproduce the above acknowledgement of the 060 * LiveGraph project and its web-site, the above copyright notice, this list of conditions 061 * and the following disclaimer in the documentation and/or other materials provided with 062 * the distribution.<br /> 063 * 3. All advertising materials mentioning features or use of this software or any derived 064 * software must display the following acknowledgement:<br /> 065 * <em>This product includes software developed by the LiveGraph project and its 066 * contributors.<br />(http://www.live-graph.org)</em><br /> 067 * 4. All advertising materials distributed in form of HTML pages or any other technology 068 * permitting active hyper-links that mention features or use of this software or any 069 * derived software must display the acknowledgment specified in condition 3 of this 070 * agreement, and in addition, include a visible and working hyper-link to the LiveGraph 071 * homepage (http://www.live-graph.org). 072 * </p> 073 * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY 074 * OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 075 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 076 * THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 077 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR 078 * IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 079 * </p> 080 * 081 * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>) 082 * @version {@value org.LiveGraph.LiveGraph#version} 083 * @see java.io.PipedInputStream 084 */ 085 public class PipedInputStream extends InputStream { 086 087 088 private static final int DEFAULT_PIPE_SIZE = 1024; // bytes 089 private static final long DEFAULT_MAX_BLOCK_DURATION = 2000; // millisseconds 090 091 092 private boolean closedByWriter = false; 093 private boolean closedByReader = false; 094 private boolean connected = false; 095 096 private long maxBlockDuration = DEFAULT_MAX_BLOCK_DURATION; 097 private long poolingPeriod = Math.min(1000L, Math.max(1L, DEFAULT_MAX_BLOCK_DURATION / 5L)); 098 099 100 /** 101 * The circular buffer into which incoming data is placed. 102 */ 103 private byte buffer[]; 104 105 /** 106 * The index of the position in the circular buffer at which the next byte of data will be 107 * stored when received from the connected piped output stream. <code>in<0</code> implies 108 * the buffer is empty, <code>in==out</code> implies the buffer is full 109 */ 110 private int in = -1; 111 112 /** 113 * The index of the position in the circular buffer at which the next byte of data will be 114 * read by this piped input stream. 115 */ 116 private int out = 0; 117 118 /** 119 * Creates a <code>PipedInputStream</code> so that it is not yet 120 * {@linkplain #connect(org.LiveGraph.dataFile.write.PipedOutputStream) connected}. 121 * It must be connected to a <code>PipedOutputStream</code> before being used. 122 */ 123 public PipedInputStream() { 124 initPipe(DEFAULT_PIPE_SIZE); 125 } 126 127 /** 128 * Creates a <code>PipedInputStream</code> so that it is not yet 129 * {@linkplain #connect(org.LiveGraph.dataFile.write.PipedOutputStream) connected} and uses the 130 * specified pipe size for the pipe's buffer. 131 * It must be connected to a <code>PipedOutputStream</code> before being used. 132 * 133 * @param pipeSize the size of the pipe's buffer. 134 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>. 135 */ 136 public PipedInputStream(int pipeSize) { 137 initPipe(pipeSize); 138 } 139 140 /** 141 * Creates a <code>PipedInputStream</code> so that it is connected to the piped output 142 * stream <code>src</code>. Data bytes written to <code>src</code> will then be available 143 * as input from this stream. 144 * 145 * @param src the stream to connect to. 146 * @exception IOException if an I/O error occurs. 147 */ 148 public PipedInputStream(PipedOutputStream src) throws IOException { 149 this(src, DEFAULT_PIPE_SIZE); 150 } 151 152 /** 153 * Creates a <code>PipedInputStream</code> so that it is connected to the piped output stream 154 * <code>src</code> and uses the specified pipe size for the pipe's buffer. 155 * Data bytes written to <code>src</code> will then be available as input from this stream. 156 * 157 * @param src the stream to connect to. 158 * @param pipeSize the size of the pipe's buffer. 159 * @exception IOException if an I/O error occurs. 160 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>. 161 */ 162 public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { 163 initPipe(pipeSize); 164 connect(src); 165 } 166 167 private void initPipe(int pipeSize) { 168 if (pipeSize <= 0) 169 throw new IllegalArgumentException("Pipe Size <= 0"); 170 buffer = new byte[pipeSize]; 171 } 172 173 /** 174 * Causes this piped input stream to be connected to the piped output stream <code>src</code>. 175 * If this object is already connected to some other piped output stream, an <code>IOException</code> 176 * is thrown.<br /> 177 * If <code>src</code> is an unconnected piped output stream and <code>snk</code> 178 * is an unconnected piped input stream, they may be connected by either the call:<br /> 179 * <pre><code>snk.connect(src)</code></pre><br /> 180 * or the call:<br /> 181 * <pre><code>src.connect(snk)</code></pre><br /> 182 * The two calls have the same effect. 183 * 184 * @param src The piped output stream to connect to. 185 * @exception IOException if an I/O error occurs. 186 */ 187 public void connect(PipedOutputStream src) throws IOException { 188 189 if (null == src) 190 throw new NullPointerException("Cannot connect to a null source"); 191 192 src.connect(this); 193 194 this.in = -1; 195 this.out = 0; 196 this.setConnected(true); 197 } 198 199 public synchronized void setMaxBlockDuration(long v) { 200 maxBlockDuration = Math.max(0L, v); 201 poolingPeriod = Math.min(1000L, Math.max(1L, maxBlockDuration / 5L)); 202 } 203 204 public synchronized long getMaxBlockDuration() { 205 return maxBlockDuration; 206 } 207 208 protected synchronized boolean getClosedByWriter() { 209 return closedByWriter; 210 } 211 212 protected synchronized void setClosedByWriter(boolean v) { 213 closedByWriter = v; 214 } 215 216 protected synchronized boolean getClosedByReader() { 217 return closedByReader; 218 } 219 220 protected synchronized void setClosedByReader(boolean v) { 221 closedByReader = v; 222 } 223 224 public synchronized boolean getConnected() { 225 return connected; 226 } 227 228 protected synchronized void setConnected(boolean v) { 229 connected = v; 230 } 231 232 /** 233 * Receives a byte of data. This method will block if no input is available. 234 * @param b the byte being received 235 * @exception IOException If the pipe is broken, 236 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an 237 * I/O error occurs; specifically, a {@link PipeFullException}, if the receiving buffer is full 238 * and waiting times out. 239 */ 240 public synchronized void receive(int b) throws IOException { 241 242 awaitSpace(); 243 244 if (in < 0) { 245 in = 0; 246 out = 0; 247 } 248 249 buffer[in++] = (byte)(b & 0xFF); 250 if (in >= buffer.length) 251 in = 0; 252 } 253 254 /** 255 * Receives data into an array of bytes. This method will block until some input is available. 256 * @param b the buffer into which the data is received 257 * @param off the start offset of the data 258 * @param len the maximum number of bytes received 259 * @exception IOException If the pipe is broken, 260 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an 261 * I/O error occurs; specifically, a {@link PipeFullException}, if the receiving buffer is full 262 * and waiting times out. 263 */ 264 public synchronized void receive(byte b[], int off, int len) throws IOException { 265 266 int bytesToTransfer = len; 267 268 while (bytesToTransfer > 0) { 269 270 awaitSpace(); 271 272 int nextTransferAmount = 0; 273 if (out < in) { 274 nextTransferAmount = buffer.length - in; 275 } else if (out > in) { 276 if (in == -1) { 277 in = out = 0; 278 nextTransferAmount = buffer.length - in; 279 } else { 280 nextTransferAmount = out - in; 281 } 282 } 283 284 if (nextTransferAmount > bytesToTransfer) 285 nextTransferAmount = bytesToTransfer; 286 287 assert(nextTransferAmount > 0); 288 289 System.arraycopy(b, off, buffer, in, nextTransferAmount); 290 291 bytesToTransfer -= nextTransferAmount; 292 off += nextTransferAmount; 293 in += nextTransferAmount; 294 if (in >= buffer.length) 295 in = 0; 296 } 297 } 298 299 private void checkStateForReceive() throws IOException { 300 301 if (!getConnected()) 302 throw new PipeNotConnectedException("Pipe not connected"); 303 304 if (getClosedByWriter()) 305 throw new PipeClosedByReaderException("Pipe closed by writer"); 306 307 if (getClosedByReader()) 308 throw new PipeClosedByReaderException("Pipe closed by reader"); 309 310 } 311 312 313 private void awaitSpace() throws IOException { 314 315 checkStateForReceive(); 316 317 if (in != out) 318 return; 319 320 long startedWaiting = System.currentTimeMillis(); 321 322 while (in == out) { 323 checkStateForReceive(); 324 325 if (System.currentTimeMillis() - startedWaiting > maxBlockDuration) 326 throw new PipeFullException("Cannot receive data: buffer full?"); 327 328 // kick any waiting readers and wait: 329 notifyAll(); 330 try { wait(poolingPeriod); } 331 catch (InterruptedException ex) { } 332 } 333 } 334 335 /** 336 * Notifies all waiting threads that the last byte of data has been 337 * received. 338 */ 339 public synchronized void receivedLast() { 340 setClosedByWriter(true); 341 notifyAll(); 342 } 343 344 /** 345 * Reads the next byte of data from this piped input stream. The value byte is returned as 346 * an <code>int</code> in the range <code>0</code> to <code>255</code>. This method blocks 347 * until input data is available, the end of the stream is detected, or an exception is thrown. 348 * 349 * @return the next byte of data, or <code>-1</code> if the end of the 350 * stream is reached. 351 * @exception IOException if the pipe is 352 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, broken, closed, or if an 353 * I/O error occurs. 354 */ 355 @Override 356 public synchronized int read() throws IOException { 357 358 if (!getConnected()) { 359 throw new PipeNotConnectedException("Pipe not connected"); 360 } else if (getClosedByReader()) { 361 throw new PipeClosedByReaderException("Pipe closed by reader"); 362 } 363 364 // Wait for data: 365 if (in < 0) { 366 while (in < 0) { 367 368 // If closed by writer, return EOF 369 if (getClosedByWriter()) 370 return -1; 371 372 // Might be a writer waiting: 373 notifyAll(); 374 try { wait(poolingPeriod); } 375 catch (InterruptedException ex) { } 376 } 377 } 378 379 int ret = buffer[out++] & 0xFF; 380 if (out >= buffer.length) 381 out = 0; 382 383 if (in == out) 384 in = -1; 385 386 return ret; 387 } 388 389 /** 390 * Reads up to <code>len</code> bytes of data from this piped input stream into an array of bytes. 391 * Less than <code>len</code> bytes will be read if the end of the data stream is reached or if 392 * <code>len</code> exceeds the pipe's buffer size. If <code>len </code> is zero, then no bytes 393 * are read and 0 is returned; otherwise, the method blocks until at least 1 byte of input is 394 * available, end of the stream has been detected, or an exception is thrown. 395 * 396 * @param b the buffer into which the data is read. 397 * @param off the start offset in the destination array <code>b</code> 398 * @param len the maximum number of bytes read. 399 * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no 400 * more data because the end of the stream has been reached. 401 * @exception NullPointerException If <code>b</code> is <code>null</code>. 402 * @exception IndexOutOfBoundsException If <code>off</code> is negative, 403 * <code>len</code> is negative, or <code>len</code> is greater than 404 * <code>b.length - off</code> 405 * @exception IOException if the pipe is broken, 406 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an 407 * I/O error occurs. 408 */ 409 @Override 410 public synchronized int read(byte b[], int off, int len) throws IOException { 411 412 if (null == b) 413 throw new NullPointerException("Cannot read into a null buffer"); 414 415 if (off < 0 || len < 0 || len > b.length - off) 416 throw new IndexOutOfBoundsException(); 417 418 if (len == 0) 419 return 0; 420 421 // Possibly wait on the first character: 422 int c = read(); 423 if (c < 0) 424 return -1; 425 426 b[off] = (byte) c; 427 int rlen = 1; 428 while (in >= 0 && len > 1) { 429 430 int available; 431 432 if (in > out) 433 available = Math.min((buffer.length - out), (in - out)); 434 else 435 available = buffer.length - out; 436 437 // A byte is read beforehand outside the loop 438 if (available > (len - 1)) 439 available = len - 1; 440 441 System.arraycopy(buffer, out, b, off + rlen, available); 442 out += available; 443 rlen += available; 444 len -= available; 445 446 if (out >= buffer.length) 447 out = 0; 448 449 if (in == out) 450 in = -1; 451 } 452 453 return rlen; 454 } 455 456 /** 457 * Returns the number of bytes that can be read from this input stream without blocking. 458 * 459 * @return the number of bytes that can be read from this input stream without blocking, 460 * or {@code 0} if this input stream has been closed by invoking its {@link #close()} method, 461 * or if the pipe is {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, 462 * or broken. 463 * @exception IOException if an I/O error occurs. 464 */ 465 @Override 466 public synchronized int available() throws IOException { 467 468 if (in < 0) 469 return 0; 470 471 if (in == out) 472 return buffer.length; 473 474 if (in > out) 475 return in - out; 476 477 return in + buffer.length - out; 478 } 479 480 /** 481 * Closes this piped input stream and releases any system resources associated with the stream. 482 * @exception IOException if an I/O error occurs. 483 */ 484 @Override 485 public synchronized void close() throws IOException { 486 setClosedByReader(true); 487 in = -1; 488 } 489 490 } // public class PipedInputStream