001 package org.LiveGraph.dataFile.write;
002
003 import java.io.IOException;
004 import java.io.OutputStream;
005
006 import org.LiveGraph.dataFile.common.PipeFullException;
007 import org.LiveGraph.dataFile.read.PipedInputStream;
008
009
010 public class PipedOutputStream extends OutputStream {
011
012
013 private PipedInputStream sink;
014
015 /**
016 * Creates a piped output stream that is not yet connected to a piped input stream.
017 * It must be connected to a piped input stream, either by the receiver or the sender, before being used.
018 */
019 public PipedOutputStream() {
020 super();
021 }
022
023 /**
024 * Creates a piped output stream connected to the specified piped input stream.
025 * Data bytes written to this stream will then be available as input from <code>snk</code>.
026 * @param snk The piped input stream to connect to.
027 * @exception IOException if an I/O error occurs.
028 */
029 public PipedOutputStream(PipedInputStream snk) throws IOException {
030 super();
031 connect(snk);
032 }
033
034
035 /**
036 * Connects this piped output stream to a receiver. If this object is already connected to some other
037 * piped input stream, an <code>IOException</code> is thrown.<br />
038 * If <code>snk</code> is an unconnected piped input stream and <code>src</code> is an unconnected piped
039 * output stream, they may be connected by either the call:<br />
040 * <pre>src.connect(snk)</pre><br />
041 * or the call:<br />
042 * <pre>snk.connect(src)</pre><br />
043 * The two calls have the same effect.
044 *
045 * @param sink the piped input stream to connect to.
046 * @exception IOException if an I/O error occurs.
047 */
048 public synchronized void connect(PipedInputStream sink) throws IOException {
049
050 if (null == sink)
051 throw new NullPointerException("Cannot connect to a null sink");
052
053 // Prevent recursive calls:
054 if (sink == this.sink)
055 return;
056
057 if (sink.getConnected())
058 throw new IOException("Sink is already connected");
059
060 this.sink = sink;
061 sink.connect(this);
062 }
063
064 /**
065 * Writes the specified <code>byte</code> to the piped output stream. <br />
066 * Implements the <code>write</code> method of <code>OutputStream</code>.
067 * This method blocks for a while to wait until the byte is written to
068 * the output stream, but quits the block and throws a {@code PipedInputStream.PipeFullException} if
069 * the data cannot be written within a certain time period.
070 *
071 * @param b the <code>byte</code> to be written.
072 * @exception IOException if the pipe is broken,
073 * {@link #connect(org.LiveGraph.dataFile.read.PipedInputStream) unconnected}, closed,
074 * or if an I/O error occurs; specifically, a {link {@link PipeFullException}, if the
075 * receiving buffer is full and waiting times out.
076 */
077 @Override
078 public void write(int b) throws IOException {
079
080 if (null == sink)
081 throw new IOException("Pipe not connected");
082
083 sink.receive(b);
084 }
085
086 /**
087 * Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to
088 * this piped output stream. This method blocks for a while to wait until all the bytes are written to
089 * the output stream, but quits the block and throws a {@code PipedInputStream.PipeFullException} if
090 * the data cannot be written within a certain time period.
091 *
092 * @param b the data.
093 * @param off the start offset in the data.
094 * @param len the number of bytes to write.
095 * @exception IOException if the pipe is broken,
096 * {@link #connect(org.LiveGraph.dataFile.read.PipedInputStream) unconnected}, closed,
097 * or if an I/O error occurs; specifically, a {link {@link PipeFullException}, if the
098 * receiving buffer is full and waiting times out.
099 */
100 @Override
101 public void write(byte b[], int off, int len) throws IOException {
102
103 if (null == sink)
104 throw new IOException("Pipe not connected");
105
106 if (null == b )
107 throw new NullPointerException("Cannot read into a new buffer");
108
109 if ( (off < 0) || (off > b.length) || (len < 0) || (off + len > b.length) || (off + len < 0) )
110 throw new IndexOutOfBoundsException();
111
112 if (len == 0)
113 return;
114
115 sink.receive(b, off, len);
116 }
117
118 /**
119 * Flushes this output stream and forces any buffered output bytes to be written out.
120 * This will notify any readers that bytes are waiting in the pipe.
121 *
122 * @exception IOException if an I/O error occurs.
123 */
124 @Override
125 public synchronized void flush() throws IOException {
126 if (null != sink) {
127 synchronized (sink) {
128 sink.notifyAll();
129 }
130 }
131 }
132
133 /**
134 * Closes this piped output stream and releases any system resources associated with this stream.
135 * This stream may no longer be used for writing bytes.
136 *
137 * @exception IOException if an I/O error occurs.
138 */
139 @Override
140 public void close() throws IOException {
141 if (null == sink)
142 return;
143 sink.receivedLast();
144 }
145
146
147 } // public class PipedOutputStream