001 package org.LiveGraph.dataCache;
002
003 import java.io.IOException;
004 import java.util.List;
005
006 import org.LiveGraph.LiveGraph;
007 import org.LiveGraph.dataFile.common.DataFormatException;
008 import org.LiveGraph.dataFile.read.DataStreamObserver;
009 import org.LiveGraph.dataFile.read.DataStreamReader;
010 import org.LiveGraph.events.Event;
011 import org.LiveGraph.events.EventListener;
012 import org.LiveGraph.events.EventManager;
013 import org.LiveGraph.events.EventProcessingException;
014 import org.LiveGraph.events.EventProducer;
015 import org.LiveGraph.events.EventType;
016
017 import com.softnetConsult.utils.exceptions.ThrowableTools;
018
019 import static org.LiveGraph.dataCache.DataUpdateEvent.*;
020
021
022 /**
023 * This reader will parse a data stream using {@link DataStreamReader} and store all
024 * information in a data cache for further processing by the application.<br />
025 * <br />
026 * See {@link org.LiveGraph.dataFile.write.DataStreamWriter} for the details of the data file format.
027 *
028 * <p style="font-size:smaller;">This product includes software developed by the
029 * <strong>LiveGraph</strong> project and its contributors.<br />
030 * (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>)<br />
031 * Copyright (c) 2007-2008 G. Paperin.<br />
032 * All rights reserved.
033 * </p>
034 * <p style="font-size:smaller;">File: DataStreamToCacheReader.java</p>
035 * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or
036 * without modification, are permitted provided that the following terms and conditions are met:
037 * </p>
038 * <p style="font-size:smaller;">1. Redistributions of source code must retain the above
039 * acknowledgement of the LiveGraph project and its web-site, the above copyright notice,
040 * this list of conditions and the following disclaimer.<br />
041 * 2. Redistributions in binary form must reproduce the above acknowledgement of the
042 * LiveGraph project and its web-site, the above copyright notice, this list of conditions
043 * and the following disclaimer in the documentation and/or other materials provided with
044 * the distribution.<br />
045 * 3. All advertising materials mentioning features or use of this software or any derived
046 * software must display the following acknowledgement:<br />
047 * <em>This product includes software developed by the LiveGraph project and its
048 * contributors.<br />(http://www.live-graph.org)</em><br />
049 * 4. All advertising materials distributed in form of HTML pages or any other technology
050 * permitting active hyper-links that mention features or use of this software or any
051 * derived software must display the acknowledgment specified in condition 3 of this
052 * agreement, and in addition, include a visible and working hyper-link to the LiveGraph
053 * homepage (http://www.live-graph.org).
054 * </p>
055 * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY
056 * OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
057 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
058 * THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
059 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
060 * IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
061 * </p>
062 *
063 * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>)
064 * @version {@value org.LiveGraph.LiveGraph#version}
065 */
066 public class DataStreamToCacheReader implements DataStreamObserver, EventListener, EventProducer {
067
068 /**
069 * The maximum period of time the reader will wait to be able to do an update before canceling the update.
070 */
071 public static final long maxWaitForUpdate = 500L;
072
073 /**
074 * Cache for storage of extracted data.
075 */
076 private DataCache cache = null;
077
078 /**
079 * Whether an update is currently running.
080 */
081 private boolean updateInProgress = false;
082
083 /**
084 * Data reader used for the previous update.
085 */
086 private DataStreamReader previousReader = null;
087
088 /**
089 * Creates a data reader on the specified stream.
090 *
091 * @param cache The data cache into which to store the data.
092 */
093 public DataStreamToCacheReader(DataCache cache) {
094
095 if (null == cache)
096 throw new NullPointerException("Cannot use a null cache.");
097
098 this.cache = cache;
099 this.previousReader = null;
100 this.updateInProgress = false;
101 }
102
103
104 /**
105 * Reads as many data lines from the underlying stream as there are available, parses the lines and
106 * stores the extracted information (if any) in this reader's data cache.
107 *
108 * @param reader Data source.
109 * @param closeAfterRead Whether to close reader after reading.
110 * @throws IOException If an I/O error occurs.
111 * @throws DataFormatException If the data stream contents do not conform with the expected data
112 * stream format.
113 * @see org.LiveGraph.dataFile.write.DataStreamWriter
114 * @see org.LiveGraph.dataFile.read.DataStreamReader
115 */
116 private void readFromStream(DataStreamReader reader, boolean closeAfterRead)
117 throws IOException, DataFormatException {
118 synchronized (reader) {
119
120 synchronized (cache) {
121 cache.bulkOperationStart();
122 try {
123 if (LiveGraph.application().getDataFileSettings().getDoNotCacheData())
124 cache.resetCache();
125 while(checkReaderOpen(reader) && reader.ready()) {
126 reader.readFromStream(DataCache.MAX_DELAYED_EVENTS + 1);
127 try { cache.wait(1); }
128 catch(InterruptedException e) { }
129 }
130 } finally {
131 cache.bulkOperationCompleted();
132 }
133 }
134
135 if (closeAfterRead)
136 reader.close();
137 }
138 }
139
140 /**
141 * Whether an update is currently running.
142 * @return Whether an update is currently running.
143 */
144 public synchronized boolean isUpdateInProgress() {
145 return updateInProgress;
146 }
147
148 /**
149 * Sets the internal {@code updateInProgress} state.
150 * @param state The new state.
151 */
152 private synchronized void setUpdateInProgress(boolean state) {
153 updateInProgress = state;
154 }
155
156 /**
157 * Checks whether the specified reader is still open by trying to execute an action on it.
158 *
159 * @param reader A data stream reader.
160 * @return {@code true} if the reader is not closed, {@code false} otherwise.
161 */
162 private boolean checkReaderOpen(DataStreamReader reader) {
163 synchronized(reader) {
164 try {
165 reader.ready();
166 return true;
167 } catch(IOException e) {
168 return false;
169 }
170 }
171 }
172
173 /**
174 * Used by {@code checkUpdateCanStart}: Verifies that an update may begin by first checking
175 * whether another update is not in progress and then requesting all event listeners to verify a
176 * {@code UPDIN_UpdateStart}-event.
177 *
178 * @return A readily validated event of type {@code DataUpdateEvent.UPDIN_UpdateStart} if an update may begin,
179 * {@code null} if the verification was not successfull.
180 */
181 private Event<DataUpdateEvent> doCheckUpdateCanStart() {
182
183 // Check that another update is not in progress:
184 if (isUpdateInProgress())
185 return null;
186
187 // Validate update:
188 EventManager eventManager = LiveGraph.application().eventManager();
189 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, UPDIN_UpdateStart);
190
191 try {
192 if (eventManager.validateEvent(event))
193 return event;
194 } catch (EventProcessingException e) { }
195 return null;
196 }
197
198 /**
199 * Verifies that an update may begin by first checking whether another update is
200 * not in progress and then requesting all event listeners to verify a
201 * {@code UPDIN_UpdateStart}-event. If the verification is not successful, the method
202 * pauses the current thread and then reattempts the verification. This may be repeated
203 * several times. If the verification was not successfull after {@code maxWaitForUpdate}
204 * milliseconds, it will be regarded as failed.
205 * This private method assumes that the calling method has synchronised on {@code this}.
206 *
207 * @param reader Data source for the update.
208 *
209 * @return A readily validated event of type {@code DataUpdateEvent.UPDIN_UpdateStart} if an update may begin,
210 * {@code null} otherwise.
211 */
212 private Event<DataUpdateEvent> checkUpdateCanStart(DataStreamReader reader) {
213
214 Event<DataUpdateEvent> startEvent = doCheckUpdateCanStart();
215 if (null != startEvent)
216 return startEvent;
217
218 long startedWaiting = System.currentTimeMillis();
219 do {
220
221 // If the reader is not open any more, we do not need to wait and can fail-fast:
222 if (!checkReaderOpen(reader))
223 return null;
224
225 try { this.wait(5); }
226 catch(InterruptedException e) { }
227
228 startEvent = doCheckUpdateCanStart();
229 if (null != startEvent)
230 return startEvent;
231 } while(System.currentTimeMillis() - startedWaiting < maxWaitForUpdate);
232
233 return null;
234 }
235
236 /**
237 * Attempts to initiate a data update in a new thread.
238 * First this method verifies that no other update is already in progress, then it
239 * validates a {@code UPDIN_UpdateStart}-event against all listeners. If both
240 * succeeds, a new thread is started that will read the stream into the cache.
241 * The verification may be attempted several times, but no longer than
242 * {@code maxWaitForUpdate} milliseconds.
243 *
244 * @param reader Data source.
245 * @param closeAfterRead Whether to close reader after reading.
246 */
247 private synchronized void startDataUpdate(final DataStreamReader reader, final boolean closeAfterRead) {
248
249 if (null == reader)
250 return;
251
252 Event<DataUpdateEvent> startEvent = checkUpdateCanStart(reader);
253 if (null == startEvent)
254 return;
255
256 setUpdateInProgress(true);
257
258 // Even if reader is closed during this operation it should not fail, no no need for synch:
259 if (previousReader != reader) {
260 reader.addObserver(this);
261 previousReader = reader;
262 }
263
264 try {
265
266 LiveGraph.application().eventManager().raiseEvent(startEvent);
267
268 Runnable updateWorker = new Runnable() {
269 public void run() {
270 try {
271 try {
272 readFromStream(reader, closeAfterRead);
273 } catch(Exception e) {
274 raiseUpdateFinishedError(e);
275 }
276 raiseUpdateFinishedSusccess();
277 } finally {
278 setUpdateInProgress(false);
279 }
280 }
281 };
282
283 Thread readerThread = new Thread(updateWorker, "LiveGraph Data File Reader");
284 readerThread.start();
285
286 // Make the best effort to reset the in-progress flag if an error occurs:
287 } catch(RuntimeException e) {
288 setUpdateInProgress(false);
289 throw e;
290 } catch(Error e) {
291 setUpdateInProgress(false);
292 throw e;
293 }
294 }
295
296
297 /**
298 * Raises an event to notify listeners that an update has finished without any errors.
299 */
300 private void raiseUpdateFinishedSusccess() {
301 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
302 DataUpdateEvent.UPDIN_UpdateFinishSuccess);
303 LiveGraph.application().eventManager().raiseEvent(event);
304 }
305
306 /**
307 * Raises an event to notify listeners that an update has finished with errors.
308 *
309 * @param err The {@code Throwable} describing the error.
310 */
311 private void raiseUpdateFinishedError(Throwable err) {
312
313 String shortMsg = err.getMessage() + "(" + err.getClass().getSimpleName() + ")";
314 String longMsg = ThrowableTools.stackTraceToString(err);
315
316 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
317 DataUpdateEvent.UPDIN_UpdateFinishError,
318 shortMsg);
319 LiveGraph.application().eventManager().raiseEvent(event);
320 LiveGraph.application().guiManager().logErrorLn(longMsg);
321 }
322
323
324 /**
325 * Used for callback by the {@link DataStreamReader}; does nothing.
326 */
327 public void eventCommentLine(String line, DataStreamReader reader) {
328 ; // No action required.
329 }
330
331 /**
332 * Used for callback by the {@link DataStreamReader}; adds a dataset to the cache.
333 */
334 public void eventDataLineRead(List<String> dataTokens, int datasetIndex, DataStreamReader reader) {
335
336 List<Double> vals = DataStreamReader.convertTokensToDoubles(dataTokens);
337 DataSet ds = new DataSet(vals, datasetIndex);
338 cache.addDataSet(ds);
339 }
340
341 /**
342 * Used for callback by the {@link DataStreamReader}; adds a file info line to the cache.
343 */
344 public void eventFileInfoLine(String info, DataStreamReader reader) {
345 cache.addDataFileInfo(info);
346 }
347
348 /**
349 * Used for callback by the {@link DataStreamReader}; setts the data column labels in the cache.
350 */
351 public void eventLabelsSet(List<String> labels, DataStreamReader reader) {
352 List<String> uniqueLabels = DataStreamReader.createUniqueLabels(labels, true);
353 cache.resetLabels(uniqueLabels);
354 }
355
356 /**
357 * Used for callback by the {@link DataStreamReader}; does nothing.
358 */
359 public void eventSeparatorSet(String separator, DataStreamReader reader) {
360 ; // No action required.
361 }
362
363
364 /**
365 * Permits to register as listener with the main LiveGraph event manager and
366 * only with the main LiveGraph event manager.
367 *
368 * @param manager The {@code EventManager} for the registering attempt.
369 * @return {@code (LiveGraph.application().eventManager() == manager)}.
370 * @see EventListener#permissionRegisterWithEventManager(EventManager)
371 */
372 public boolean permissionRegisterWithEventManager(EventManager manager) {
373 return LiveGraph.application().eventManager() == manager;
374 }
375
376 /**
377 * Does not permit any unregistering.
378 *
379 * @param manager The {@code EventManager} for the registering attempt.
380 * @return {@code false}.
381 * @see EventListener#permissionUnregisterWithEventManager(EventManager)
382 */
383 public boolean permissionUnregisterWithEventManager(EventManager manager) {
384 return false;
385 }
386
387 /**
388 * Does nothing.
389 *
390 * @param manager The {@code EventManager} with which this {@code EventListener} is now registered.
391 * @see EventListener#completedRegisterWithEventManager(EventManager)
392 */
393 public void completedRegisterWithEventManager(EventManager manager) { }
394
395 /**
396 * Does nothing.
397 *
398 * @param manager The {@code EventManager} with which this {@code EventListener} is now unregistered.
399 * @see EventListener#completedUnregisterWithEventManager(EventManager)
400 */
401 public void completedUnregisterWithEventManager(EventManager manager) { }
402
403 /**
404 * Does nothing.
405 *
406 * @param event An event in which this {@code EventListener} may be interested.
407 * @return {@code false}.
408 * @see EventListener#checkEventInterest(Event)
409 */
410 public boolean checkEventInterest(Event<? extends EventType> event) {
411 return false;
412 }
413
414 /**
415 * Does nothing.
416 *
417 * @param event The event to be validated.
418 * @param soFar Whether {@code event} has been successfuly validated by whichever {@code EventListener}s
419 * (if any) were invoked to validate {@code event} before this {@code EventListener}.
420 * @return {@code true}.
421 * @see EventListener#checkEventValid(Event, boolean)
422 */
423 public boolean checkEventValid(Event<? extends EventType> event, boolean soFar) {
424 return true;
425 }
426
427 /**
428 * Processes LiveGraph events.
429 *
430 * @param event The event to process.
431 */
432 public void eventRaised(Event<? extends EventType> event) throws Exception {
433
434 if (event.getDomain() == DataUpdateEvent.class) {
435 processDataUpdateEvent(event.cast(DataUpdateEvent.class));
436 return;
437 }
438 }
439
440 /**
441 * Attempts to initiate an update when a {@code UPDIN_InitiateUpdate}-event is received.
442 *
443 * @param event The event to process.
444 */
445 private void processDataUpdateEvent(Event<DataUpdateEvent> event) {
446
447 if (UPDIN_InitiateUpdate == event.getType()) {
448 startDataUpdate((DataStreamReader) event.getInfoObject(), event.getInfoBoolean());
449 return;
450 }
451
452 }
453
454 /**
455 * Does nothing.
456 * @param event Event that cause an exception.
457 * @param exception The exception.
458 * @return {@code false}.
459 */
460 public boolean eventProcessingException(Event<? extends EventType> event, EventProcessingException exception) {
461 return false;
462 }
463
464 /**
465 * Does nothing.
466 * @param event Event that was processed.
467 */
468 public void eventProcessingFinished(Event<? extends EventType> event) { }
469
470 } // public class DataStreamToCacheReader