001 package org.LiveGraph.dataFile.read;
002
003 import java.io.IOException;
004 import java.io.InputStream;
005
006 import org.LiveGraph.dataFile.common.PipeClosedByReaderException;
007 import org.LiveGraph.dataFile.common.PipeFullException;
008 import org.LiveGraph.dataFile.common.PipeNotConnectedException;
009 import org.LiveGraph.dataFile.write.PipedOutputStream;
010
011
012 /**
013 * This class makes Java's own {@code PipedInputStream} fit for reading by multiple Threads as
014 * required for LiveGraph.<br />
015 * <p>The thread handling built into Java's {@code java.io.PipedInputStream} gives, at the very best,
016 * reasons to hope for improvement. Sun seems to be aware of the problem: for instance, a Java API
017 * developer writes directly in the source comments of the officially distributed JDK 1.6.0 source
018 * package: "<em>[...] identification of the read and write sides needs to be more
019 * sophisticated [...]</em>;" (see the non-JavDoc comments in the source for
020 * {@code java.io.PipedInputStream}, JDK 1.6.0, lines 38-41). However, to date the problem remains.</p>
021 * <p>For LiveGraph specifically, the problem is that {@code PipedInputStream} remembers
022 * the {@code Thread} that performed the latest read operation and checks before the following
023 * receive operation from the {@code PipedOutputStream}, whether that {@code Thread} is still alive.
024 * However, LiveGraph creates a new {@code Thread} for each update in order to make sure that the
025 * application remians responsive even if the amount of new data is large, the old threads are
026 * discarded, which causes {@code PipedInputStream} to throw an exception.<br />
027 * As a second problem, {@code PipedInputStream} causes the {@code write}-call of the
028 * {@code PipedOutputStream} to block indefinetly if the memory buffer is full. If the LiveGraph update
029 * frequency is set to low, the buffer may fill up which would cause the data producing part of the
030 * application to block. This is highly undesirable - while a short, time-limited block may be ok,
031 * an exception should be thrown if the buffer remains full for a long time to indicate to the developer
032 * that the chosen buffer size is not sufficiently large for the particular application.</p>
033 * <p>Unfortunately, the choice of scope classifiers for several methods in
034 * {@code java.io.PipedInputStream} is less than perfect. For instance, the method
035 * {@code receive(byte b[], int off, int len)} has package scope and cannot be overridden to
036 * resolve the above issues. In addition, the inapropriate use of package-visible variables such
037 * as {@code connected} instead of getter and setter methods makes overriding attampts useless. This
038 * forces LiveGraph to subclass {@code InputStream} directly to greate a better version of a piped input
039 * stream and reimplement <em>all</em> of {@code java.io.PipedInputStream}'s methods, thus unnecessarily
040 * replicating a lot of code. This may become unnecessary in future if the above problems are resolved
041 * or if LiveGraph is adapted to use the {@code java.nio} channel-based I/O instead of the
042 * traditional {@code java.io} stream-based approach (this would be a good idea anyway, if time permits
043 * to make these changes at some point in the future). For now, the source code of this class is copied
044 * from {@code java.io.PipedInputStream} that is dispributed in the source package for JDK 1.6.0 and
045 * changed where necessary.</p>
046 *
047 * <p>
048 * <strong>LiveGraph</strong>
049 * (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>).
050 * </p>
051 * <p>Copyright (c) 2007-2008 by G. Paperin.</p>
052 * <p>File: PipedInputStream.java</p>
053 * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or
054 * without modification, are permitted provided that the following terms and conditions are met:
055 * </p>
056 * <p style="font-size:smaller;">1. Redistributions of source code must retain the above
057 * acknowledgement of the LiveGraph project and its web-site, the above copyright notice,
058 * this list of conditions and the following disclaimer.<br />
059 * 2. Redistributions in binary form must reproduce the above acknowledgement of the
060 * LiveGraph project and its web-site, the above copyright notice, this list of conditions
061 * and the following disclaimer in the documentation and/or other materials provided with
062 * the distribution.<br />
063 * 3. All advertising materials mentioning features or use of this software or any derived
064 * software must display the following acknowledgement:<br />
065 * <em>This product includes software developed by the LiveGraph project and its
066 * contributors.<br />(http://www.live-graph.org)</em><br />
067 * 4. All advertising materials distributed in form of HTML pages or any other technology
068 * permitting active hyper-links that mention features or use of this software or any
069 * derived software must display the acknowledgment specified in condition 3 of this
070 * agreement, and in addition, include a visible and working hyper-link to the LiveGraph
071 * homepage (http://www.live-graph.org).
072 * </p>
073 * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY
074 * OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
075 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
076 * THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
077 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
078 * IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
079 * </p>
080 *
081 * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>)
082 * @version {@value org.LiveGraph.LiveGraph#version}
083 * @see java.io.PipedInputStream
084 */
085 public class PipedInputStream extends InputStream {
086
087
088 private static final int DEFAULT_PIPE_SIZE = 1024; // bytes
089 private static final long DEFAULT_MAX_BLOCK_DURATION = 2000; // millisseconds
090
091
092 private boolean closedByWriter = false;
093 private boolean closedByReader = false;
094 private boolean connected = false;
095
096 private long maxBlockDuration = DEFAULT_MAX_BLOCK_DURATION;
097 private long poolingPeriod = Math.min(1000L, Math.max(1L, DEFAULT_MAX_BLOCK_DURATION / 5L));
098
099
100 /**
101 * The circular buffer into which incoming data is placed.
102 */
103 private byte buffer[];
104
105 /**
106 * The index of the position in the circular buffer at which the next byte of data will be
107 * stored when received from the connected piped output stream. <code>in<0</code> implies
108 * the buffer is empty, <code>in==out</code> implies the buffer is full
109 */
110 private int in = -1;
111
112 /**
113 * The index of the position in the circular buffer at which the next byte of data will be
114 * read by this piped input stream.
115 */
116 private int out = 0;
117
118 /**
119 * Creates a <code>PipedInputStream</code> so that it is not yet
120 * {@linkplain #connect(org.LiveGraph.dataFile.write.PipedOutputStream) connected}.
121 * It must be connected to a <code>PipedOutputStream</code> before being used.
122 */
123 public PipedInputStream() {
124 initPipe(DEFAULT_PIPE_SIZE);
125 }
126
127 /**
128 * Creates a <code>PipedInputStream</code> so that it is not yet
129 * {@linkplain #connect(org.LiveGraph.dataFile.write.PipedOutputStream) connected} and uses the
130 * specified pipe size for the pipe's buffer.
131 * It must be connected to a <code>PipedOutputStream</code> before being used.
132 *
133 * @param pipeSize the size of the pipe's buffer.
134 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>.
135 */
136 public PipedInputStream(int pipeSize) {
137 initPipe(pipeSize);
138 }
139
140 /**
141 * Creates a <code>PipedInputStream</code> so that it is connected to the piped output
142 * stream <code>src</code>. Data bytes written to <code>src</code> will then be available
143 * as input from this stream.
144 *
145 * @param src the stream to connect to.
146 * @exception IOException if an I/O error occurs.
147 */
148 public PipedInputStream(PipedOutputStream src) throws IOException {
149 this(src, DEFAULT_PIPE_SIZE);
150 }
151
152 /**
153 * Creates a <code>PipedInputStream</code> so that it is connected to the piped output stream
154 * <code>src</code> and uses the specified pipe size for the pipe's buffer.
155 * Data bytes written to <code>src</code> will then be available as input from this stream.
156 *
157 * @param src the stream to connect to.
158 * @param pipeSize the size of the pipe's buffer.
159 * @exception IOException if an I/O error occurs.
160 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>.
161 */
162 public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
163 initPipe(pipeSize);
164 connect(src);
165 }
166
167 private void initPipe(int pipeSize) {
168 if (pipeSize <= 0)
169 throw new IllegalArgumentException("Pipe Size <= 0");
170 buffer = new byte[pipeSize];
171 }
172
173 /**
174 * Causes this piped input stream to be connected to the piped output stream <code>src</code>.
175 * If this object is already connected to some other piped output stream, an <code>IOException</code>
176 * is thrown.<br />
177 * If <code>src</code> is an unconnected piped output stream and <code>snk</code>
178 * is an unconnected piped input stream, they may be connected by either the call:<br />
179 * <pre><code>snk.connect(src)</code></pre><br />
180 * or the call:<br />
181 * <pre><code>src.connect(snk)</code></pre><br />
182 * The two calls have the same effect.
183 *
184 * @param src The piped output stream to connect to.
185 * @exception IOException if an I/O error occurs.
186 */
187 public void connect(PipedOutputStream src) throws IOException {
188
189 if (null == src)
190 throw new NullPointerException("Cannot connect to a null source");
191
192 src.connect(this);
193
194 this.in = -1;
195 this.out = 0;
196 this.setConnected(true);
197 }
198
199 public synchronized void setMaxBlockDuration(long v) {
200 maxBlockDuration = Math.max(0L, v);
201 poolingPeriod = Math.min(1000L, Math.max(1L, maxBlockDuration / 5L));
202 }
203
204 public synchronized long getMaxBlockDuration() {
205 return maxBlockDuration;
206 }
207
208 protected synchronized boolean getClosedByWriter() {
209 return closedByWriter;
210 }
211
212 protected synchronized void setClosedByWriter(boolean v) {
213 closedByWriter = v;
214 }
215
216 protected synchronized boolean getClosedByReader() {
217 return closedByReader;
218 }
219
220 protected synchronized void setClosedByReader(boolean v) {
221 closedByReader = v;
222 }
223
224 public synchronized boolean getConnected() {
225 return connected;
226 }
227
228 protected synchronized void setConnected(boolean v) {
229 connected = v;
230 }
231
232 /**
233 * Receives a byte of data. This method will block if no input is available.
234 * @param b the byte being received
235 * @exception IOException If the pipe is broken,
236 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an
237 * I/O error occurs; specifically, a {@link PipeFullException}, if the receiving buffer is full
238 * and waiting times out.
239 */
240 public synchronized void receive(int b) throws IOException {
241
242 awaitSpace();
243
244 if (in < 0) {
245 in = 0;
246 out = 0;
247 }
248
249 buffer[in++] = (byte)(b & 0xFF);
250 if (in >= buffer.length)
251 in = 0;
252 }
253
254 /**
255 * Receives data into an array of bytes. This method will block until some input is available.
256 * @param b the buffer into which the data is received
257 * @param off the start offset of the data
258 * @param len the maximum number of bytes received
259 * @exception IOException If the pipe is broken,
260 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an
261 * I/O error occurs; specifically, a {@link PipeFullException}, if the receiving buffer is full
262 * and waiting times out.
263 */
264 public synchronized void receive(byte b[], int off, int len) throws IOException {
265
266 int bytesToTransfer = len;
267
268 while (bytesToTransfer > 0) {
269
270 awaitSpace();
271
272 int nextTransferAmount = 0;
273 if (out < in) {
274 nextTransferAmount = buffer.length - in;
275 } else if (out > in) {
276 if (in == -1) {
277 in = out = 0;
278 nextTransferAmount = buffer.length - in;
279 } else {
280 nextTransferAmount = out - in;
281 }
282 }
283
284 if (nextTransferAmount > bytesToTransfer)
285 nextTransferAmount = bytesToTransfer;
286
287 assert(nextTransferAmount > 0);
288
289 System.arraycopy(b, off, buffer, in, nextTransferAmount);
290
291 bytesToTransfer -= nextTransferAmount;
292 off += nextTransferAmount;
293 in += nextTransferAmount;
294 if (in >= buffer.length)
295 in = 0;
296 }
297 }
298
299 private void checkStateForReceive() throws IOException {
300
301 if (!getConnected())
302 throw new PipeNotConnectedException("Pipe not connected");
303
304 if (getClosedByWriter())
305 throw new PipeClosedByReaderException("Pipe closed by writer");
306
307 if (getClosedByReader())
308 throw new PipeClosedByReaderException("Pipe closed by reader");
309
310 }
311
312
313 private void awaitSpace() throws IOException {
314
315 checkStateForReceive();
316
317 if (in != out)
318 return;
319
320 long startedWaiting = System.currentTimeMillis();
321
322 while (in == out) {
323 checkStateForReceive();
324
325 if (System.currentTimeMillis() - startedWaiting > maxBlockDuration)
326 throw new PipeFullException("Cannot receive data: buffer full?");
327
328 // kick any waiting readers and wait:
329 notifyAll();
330 try { wait(poolingPeriod); }
331 catch (InterruptedException ex) { }
332 }
333 }
334
335 /**
336 * Notifies all waiting threads that the last byte of data has been
337 * received.
338 */
339 public synchronized void receivedLast() {
340 setClosedByWriter(true);
341 notifyAll();
342 }
343
344 /**
345 * Reads the next byte of data from this piped input stream. The value byte is returned as
346 * an <code>int</code> in the range <code>0</code> to <code>255</code>. This method blocks
347 * until input data is available, the end of the stream is detected, or an exception is thrown.
348 *
349 * @return the next byte of data, or <code>-1</code> if the end of the
350 * stream is reached.
351 * @exception IOException if the pipe is
352 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, broken, closed, or if an
353 * I/O error occurs.
354 */
355 @Override
356 public synchronized int read() throws IOException {
357
358 if (!getConnected()) {
359 throw new PipeNotConnectedException("Pipe not connected");
360 } else if (getClosedByReader()) {
361 throw new PipeClosedByReaderException("Pipe closed by reader");
362 }
363
364 // Wait for data:
365 if (in < 0) {
366 while (in < 0) {
367
368 // If closed by writer, return EOF
369 if (getClosedByWriter())
370 return -1;
371
372 // Might be a writer waiting:
373 notifyAll();
374 try { wait(poolingPeriod); }
375 catch (InterruptedException ex) { }
376 }
377 }
378
379 int ret = buffer[out++] & 0xFF;
380 if (out >= buffer.length)
381 out = 0;
382
383 if (in == out)
384 in = -1;
385
386 return ret;
387 }
388
389 /**
390 * Reads up to <code>len</code> bytes of data from this piped input stream into an array of bytes.
391 * Less than <code>len</code> bytes will be read if the end of the data stream is reached or if
392 * <code>len</code> exceeds the pipe's buffer size. If <code>len </code> is zero, then no bytes
393 * are read and 0 is returned; otherwise, the method blocks until at least 1 byte of input is
394 * available, end of the stream has been detected, or an exception is thrown.
395 *
396 * @param b the buffer into which the data is read.
397 * @param off the start offset in the destination array <code>b</code>
398 * @param len the maximum number of bytes read.
399 * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no
400 * more data because the end of the stream has been reached.
401 * @exception NullPointerException If <code>b</code> is <code>null</code>.
402 * @exception IndexOutOfBoundsException If <code>off</code> is negative,
403 * <code>len</code> is negative, or <code>len</code> is greater than
404 * <code>b.length - off</code>
405 * @exception IOException if the pipe is broken,
406 * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an
407 * I/O error occurs.
408 */
409 @Override
410 public synchronized int read(byte b[], int off, int len) throws IOException {
411
412 if (null == b)
413 throw new NullPointerException("Cannot read into a null buffer");
414
415 if (off < 0 || len < 0 || len > b.length - off)
416 throw new IndexOutOfBoundsException();
417
418 if (len == 0)
419 return 0;
420
421 // Possibly wait on the first character:
422 int c = read();
423 if (c < 0)
424 return -1;
425
426 b[off] = (byte) c;
427 int rlen = 1;
428 while (in >= 0 && len > 1) {
429
430 int available;
431
432 if (in > out)
433 available = Math.min((buffer.length - out), (in - out));
434 else
435 available = buffer.length - out;
436
437 // A byte is read beforehand outside the loop
438 if (available > (len - 1))
439 available = len - 1;
440
441 System.arraycopy(buffer, out, b, off + rlen, available);
442 out += available;
443 rlen += available;
444 len -= available;
445
446 if (out >= buffer.length)
447 out = 0;
448
449 if (in == out)
450 in = -1;
451 }
452
453 return rlen;
454 }
455
456 /**
457 * Returns the number of bytes that can be read from this input stream without blocking.
458 *
459 * @return the number of bytes that can be read from this input stream without blocking,
460 * or {@code 0} if this input stream has been closed by invoking its {@link #close()} method,
461 * or if the pipe is {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected},
462 * or broken.
463 * @exception IOException if an I/O error occurs.
464 */
465 @Override
466 public synchronized int available() throws IOException {
467
468 if (in < 0)
469 return 0;
470
471 if (in == out)
472 return buffer.length;
473
474 if (in > out)
475 return in - out;
476
477 return in + buffer.length - out;
478 }
479
480 /**
481 * Closes this piped input stream and releases any system resources associated with the stream.
482 * @exception IOException if an I/O error occurs.
483 */
484 @Override
485 public synchronized void close() throws IOException {
486 setClosedByReader(true);
487 in = -1;
488 }
489
490 } // public class PipedInputStream