001 package org.LiveGraph.dataCache;
002
003 import java.io.File;
004 import java.io.FileInputStream;
005 import java.io.FileNotFoundException;
006 import java.io.IOException;
007 import java.io.InputStream;
008
009 import org.LiveGraph.LiveGraph;
010 import org.LiveGraph.dataCache.DataCache.CacheMode;
011 import org.LiveGraph.dataFile.read.DataStreamReader;
012 import org.LiveGraph.dataFile.read.PipedInputStream;
013 import org.LiveGraph.dataFile.write.DataStreamWriter;
014 import org.LiveGraph.dataFile.write.DataStreamWriterFactory;
015 import org.LiveGraph.events.Event;
016 import org.LiveGraph.events.EventListener;
017 import org.LiveGraph.events.EventManager;
018 import org.LiveGraph.events.EventProcessingException;
019 import org.LiveGraph.events.EventProducer;
020 import org.LiveGraph.events.EventType;
021 import org.LiveGraph.settings.DataFileSettings;
022 import org.LiveGraph.settings.SettingsEvent;
023
024 import com.softnetConsult.utils.exceptions.ThrowableTools;
025
026
027 /**
028 * An object of this class is used to triger updates from a data input stream
029 * into a {@link DataCache} at regular intervals.
030 *
031 * <p style="font-size:smaller;">This product includes software developed by the
032 * <strong>LiveGraph</strong> project and its contributors.<br />
033 * (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>)<br />
034 * Copyright (c) 2007-2008 G. Paperin.<br />
035 * All rights reserved.
036 * </p>
037 * <p style="font-size:smaller;">File: UpdateInvoker.java</p>
038 * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or
039 * without modification, are permitted provided that the following terms and conditions are met:
040 * </p>
041 * <p style="font-size:smaller;">1. Redistributions of source code must retain the above
042 * acknowledgement of the LiveGraph project and its web-site, the above copyright notice,
043 * this list of conditions and the following disclaimer.<br />
044 * 2. Redistributions in binary form must reproduce the above acknowledgement of the
045 * LiveGraph project and its web-site, the above copyright notice, this list of conditions
046 * and the following disclaimer in the documentation and/or other materials provided with
047 * the distribution.<br />
048 * 3. All advertising materials mentioning features or use of this software or any derived
049 * software must display the following acknowledgement:<br />
050 * <em>This product includes software developed by the LiveGraph project and its
051 * contributors.<br />(http://www.live-graph.org)</em><br />
052 * 4. All advertising materials distributed in form of HTML pages or any other technology
053 * permitting active hyper-links that mention features or use of this software or any
054 * derived software must display the acknowledgment specified in condition 3 of this
055 * agreement, and in addition, include a visible and working hyper-link to the LiveGraph
056 * homepage (http://www.live-graph.org).
057 * </p>
058 * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY
059 * OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
060 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
061 * THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
062 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
063 * IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
064 * </p>
065 *
066 * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>)
067 * @version {@value org.LiveGraph.LiveGraph#version}
068 */
069 public class UpdateInvoker implements Runnable, EventListener, EventProducer {
070
071 /**
072 * How long to sleep for when updates are to be invoked automatically.
073 */
074 private static final long timeTickLength = 50; // milliseconds
075
076 /**
077 * Determines wether the invoker is in memory-stream mode.
078 * @see #startMemoryStreamMode(InputStream)
079 */
080 private boolean memoryStreamMode = false;
081
082 /**
083 * The data reader for the input stream.
084 */
085 private DataStreamReader dataReader = null;
086
087 /**
088 * Cache to hold the data.
089 */
090 private DataCache dataCache = null;
091
092 /**
093 * Data file from which to update.
094 */
095 private File dataFile = null;
096
097 /**
098 * Interval between updates in milliseconds.
099 */
100 private long interval = -1;
101
102 /**
103 * Whether the invoker thread should wind up at the next possibility.
104 */
105 private boolean mustQuit = false;
106
107 /**
108 * Remaining milliseconds till the next update.
109 */
110 private long remainingMillis = -1;
111
112 /**
113 * System milliseconds at last update.
114 */
115 private long lastUpdateTime = 0;
116
117 /**
118 * Milliseconds since last update.
119 */
120 private long sinceUpdateTime = 0;
121
122
123 /**
124 * Constructs a new invoker.
125 * @param cache The application's data cache.
126 */
127 public UpdateInvoker(DataCache cache) {
128
129 if (null == cache)
130 throw new NullPointerException("Cannot read data into a null cache");
131
132 this.dataCache = cache;
133
134 this.dataFile = null;
135 this.dataReader = null;
136
137 this.memoryStreamMode = false;
138
139 this.mustQuit = false;
140
141 this.interval = -1;
142 this.remainingMillis = -1;
143 this.lastUpdateTime = 0;
144 this.sinceUpdateTime = 0;
145 }
146
147 /**
148 * Sets the length of the interval between automatic data updates in milliseconds.
149 * If {@code interval <= 0} the update will not be triggered automatically.
150 *
151 * @param interval The length of the interval between automatic data updates in milliseconds
152 * (if {@code interval <= 0} the update will not be triggered automatically).
153 */
154 private synchronized void setInterval(long interval) {
155
156 if (interval == this.interval)
157 return;
158
159 if (this.interval <= 0 && interval > 0)
160 this.lastUpdateTime = 0;
161
162 this.interval = interval;
163 this.notifyAll();
164 }
165
166 /**
167 * The length of the interval between data updates.
168 * If {@code interval <= 0} the update will not be triggered automatically.
169 * @return The length of the interval between automatic data updates in milliseconds;
170 * a value {@code interval <= 0} indicated that no updates will be triggered automatically.
171 */
172 public synchronized long getInterval() {
173 return interval;
174 }
175
176 /**
177 * Used to notify this invoker that is must stop running at the next possibility.
178 *
179 * @param val Whether this invoker should stop running at the next possibility.
180 */
181 public synchronized void setMustQuit(boolean val) {
182 this.mustQuit = val;
183 this.notifyAll();
184 }
185
186 /**
187 * Time to next update.
188 *
189 * @return Number of milliseconds left until the next update.
190 */
191 public synchronized long getRemainingMillis() {
192 return remainingMillis;
193 }
194
195 private void resetCache(CacheMode mode) {
196 synchronized (dataCache) {
197 if (null == mode)
198 dataCache.resetCache();
199 else
200 dataCache.resetCache(mode);
201 }
202 }
203
204 /**
205 * Sets the file from which the next update will be read and closes the previously used reader.
206 *
207 * @param fileName File from which to read the data from now on.
208 * @throws IllegalStateException If no valid data cache is set.
209 */
210 private void setDataFile(String fileName) {
211
212 if (null == fileName)
213 throw new NullPointerException("Cannot read data from a null file name");
214
215 File file = new File(fileName);
216
217 closeDataReader();
218 dataFile = file;
219 }
220
221
222 private synchronized void closeDataReader() {
223
224 if (null == dataReader)
225 return;
226
227 synchronized(dataReader) {
228 try {
229 dataReader.close();
230 } catch(IOException e) {
231 LiveGraph.application().guiManager().logErrorLn(ThrowableTools.stackTraceToString(e));
232 } finally {
233 dataReader = null;
234 }
235 }
236 }
237
238 /**
239 * Tells whether this invoker's reader's underlying data stream is ready to be read.
240 *
241 * @return {@code true} if the next {@code readFromStream()} is guaranteed not to block for input,
242 * {@code false} otherwise. Note that returning {@code false} does not guarantee that the next read will block.
243 * @throws IOException If an I/O error occurs.
244 */
245 public synchronized boolean ready() throws IOException {
246
247 if (null == dataReader)
248 return false;
249
250 try {
251 synchronized (dataReader) {
252 return dataReader.ready();
253 }
254 } catch(NullPointerException e) {
255 // This will happen when dataReader became null after the check. It's ok, just return.
256 return false;
257 }
258 }
259
260 private void raiseCannotInitiateUpdate(String message) {
261 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
262 DataUpdateEvent.UPDIN_CannotInitiateUpdate,
263 null == message ? "" : message);
264 LiveGraph.application().eventManager().raiseEvent(event);
265 }
266
267 /**
268 * Creates a new {@link DataStreamToCacheReader} for a stream on the currently set data file.
269 * Called by {@link #requestUpdate()} either when no data reader is available
270 * ({@code dataReader} is {@code null}), or if a data reader is available, but "do not cacha data"
271 * is activated.
272 *
273 * @return {@code true} if a data reader was opened, {@code false} if a data reader could not be
274 * successfully opened (also raises an {@code UPDIN_CannotInitiateUpdate}-event).
275 */
276 private boolean openDataFileReader() {
277
278 if (null == dataFile || 0 == dataFile.getPath().length()) {
279 raiseCannotInitiateUpdate("No data input specified");
280 return false;
281 }
282
283 closeDataReader();
284
285 try {
286 dataReader = new DataStreamReader(new FileInputStream(dataFile));
287 } catch(FileNotFoundException e) {
288 raiseCannotInitiateUpdate("File not found: " + dataFile.getName());
289 LiveGraph.application().guiManager().logErrorLn(e.getMessage());
290 return false;
291 } catch (Exception e) {
292 raiseCannotInitiateUpdate(e.getMessage());
293 LiveGraph.application().guiManager().logErrorLn(e.getMessage());
294 return false;
295 }
296
297 return true;
298 }
299
300 /**
301 * Raises a {@code UPDIN_InitiateUpdate}-event to notify the {@code DataStreamToCacheReader} that
302 * it is time for a data update.
303 */
304 public synchronized void requestUpdate() {
305
306 lastUpdateTime = System.currentTimeMillis();
307
308 // Reopen data reader:
309 if (null == dataReader || LiveGraph.application().getDataFileSettings().getDoNotCacheData()) {
310
311 if (!openDataFileReader())
312 return;
313 }
314
315 try {
316 synchronized (dataReader) {
317
318 if (null == dataReader)
319 return;
320
321 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(
322 this, DataUpdateEvent.class,
323 DataUpdateEvent.UPDIN_InitiateUpdate,
324 LiveGraph.application().getDataFileSettings().getDoNotCacheData(),
325 0L, Double.NaN,
326 dataReader);
327 LiveGraph.application().eventManager().raiseEvent(event);
328 }
329 } catch(NullPointerException e) {
330 // This will happen when dataReader became null after the check or when it could not be created.
331 // It's ok, just return.
332 }
333 }
334
335 /**
336 * Send the this invoker to sleep for {@code timeTickLength} milliseconds.
337 * When it wakes it, internal time state is updated an the observers notified.
338 */
339 private void timeTick() {
340
341 synchronized(this) {
342
343 try {
344 if (interval < 0)
345 this.wait();
346 else
347 this.wait(timeTickLength);
348 } catch(InterruptedException e) { }
349
350 sinceUpdateTime = System.currentTimeMillis() - lastUpdateTime;
351 remainingMillis = interval <= 0 ? -1 : Math.max(0, interval - sinceUpdateTime);
352 }
353
354 raiseTimerTick();
355 }
356
357 /**
358 * Raises an event to notify listeners that this invoker has waken up to process events.
359 * This gives listeners displaying various information about this invoker a chance to
360 * update their state.
361 */
362 private void raiseTimerTick() {
363 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
364 DataUpdateEvent.UPDIN_TimerTick);
365 LiveGraph.application().eventManager().raiseEvent(event);
366 }
367
368
369 /**
370 * Winds up the operations by closing the current data reader.
371 */
372 private void tidyUp() {
373 closeDataReader();
374 }
375
376 /**
377 * Main invoker loop:
378 * call {@link #timeTick()};
379 * if it is time for the next update, call {@link #requestUpdate()};
380 * call {@link #timeTick()} again and continue the loop until {@link #mustQuit} is set to true;
381 * call {@link #tidyUp()} before quitting.
382 */
383 public void run() {
384
385 while (true) {
386
387 try {
388 synchronized (this) {
389 if (mustQuit) {
390 tidyUp();
391 return;
392 }
393 }
394
395 timeTick();
396
397 if (sinceUpdateTime >= interval && interval > 0)
398 requestUpdate();
399 } catch(Throwable e) {
400 LiveGraph.application().guiManager().logErrorLn(ThrowableTools.stackTraceToString(e));
401 }
402 }
403 }
404
405 /**
406 * Uses a pipe buffer of 5 MB. To customise the buffer size, create your own streams and use
407 * {@link #startMemoryStreamMode(InputStream)} to initiate the memory stream mode.
408 * @return xxx
409 */
410 public DataStreamWriter startMemoryStreamMode() {
411
412 try {
413 PipedInputStream ins = new PipedInputStream(5 * 1024 * 1024); // 5 MB
414 DataStreamWriter outw = DataStreamWriterFactory.createDataWriter(ins);
415 if (!startMemoryStreamMode(ins))
416 return null;
417 return outw;
418 } catch(IOException e) {
419 return null;
420 }
421
422 }
423
424 public boolean startMemoryStreamMode(InputStream in) throws IOException {
425
426 // Check not null
427 if (null == in)
428 throw new NullPointerException("Cannot use a null stream for memory stream mode");
429
430 // Check stream class:
431 if (in instanceof java.io.PipedInputStream) {
432 final String correctClassName = org.LiveGraph.dataFile.read.PipedInputStream.class.getCanonicalName();
433 throw new IllegalArgumentException(
434 String.format("Streams of type java.io.PipedInputStream are incompatible with the memory"
435 + " stream mode. %nUse %1$s instead. %n"
436 + "For further info see the API reference for %1$s.",
437 correctClassName));
438 }
439
440 // Check stream open:
441 in.available();
442
443 // Validate switching to memory stream mode:
444 EventManager eventManager = LiveGraph.application().eventManager();
445 Event<DataUpdateEvent> startEvent = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
446 DataUpdateEvent.UPDIN_StartMemoryStreamMode);
447 try {
448 if (!eventManager.validateEvent(startEvent))
449 return false;
450 } catch(EventProcessingException e) {
451 return false;
452 }
453
454 // Memory stream is only possible in caching mode:
455 DataFileSettings dfs = LiveGraph.application().getDataFileSettings();
456 dfs.setDoNotCacheData(false);
457 if (dfs.getDoNotCacheData())
458 return false;
459
460 // Make sure all previous events are processed:
461 LiveGraph.application().eventManager().waitForEvents();
462
463 synchronized (this) {
464
465 // Close old reader and open memory stream reader:
466 closeDataReader();
467 resetCache(null);
468 dataReader = new DataStreamReader(in);
469 memoryStreamMode = true;
470 eventManager.raiseEvent(startEvent);
471 return true;
472 }
473 }
474
475 public synchronized boolean endMemoryStreamMode() {
476
477 // Validate switching:
478 Event<DataUpdateEvent> endEvent = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
479 DataUpdateEvent.UPDIN_EndMemoryStreamMode);
480 try {
481 if (!LiveGraph.application().eventManager().validateEvent(endEvent))
482 return false;
483 } catch(EventProcessingException e) {
484 return false;
485 }
486
487 // Close the memory stream reader:
488 closeDataReader();
489 memoryStreamMode = false;
490
491 // Set the data file and cache mode according to the current settings:
492 setDataFile(LiveGraph.application().getDataFileSettings().getDataFile());
493
494 LiveGraph.application().eventManager().raiseEvent(endEvent);
495 return true;
496 }
497
498 /**
499 * Permits to register as listener with the main LiveGraph event manager and
500 * only with the main LiveGraph event manager.
501 *
502 * @param manager The {@code EventManager} for the registering attempt.
503 * @return {@code (LiveGraph.application().eventManager() == manager)}.
504 * @see EventListener#permissionRegisterWithEventManager(EventManager)
505 */
506 public boolean permissionRegisterWithEventManager(EventManager manager) {
507 return LiveGraph.application().eventManager() == manager;
508 }
509
510 /**
511 * Does not permit any unregistering.
512 *
513 * @param manager The {@code EventManager} for the registering attempt.
514 * @return {@code false}.
515 * @see EventListener#permissionUnregisterWithEventManager(EventManager)
516 */
517 public boolean permissionUnregisterWithEventManager(EventManager manager) {
518 return false;
519 }
520
521 /**
522 * Does nothing.
523 *
524 * @param manager The {@code EventManager} with which this {@code EventListener} is now registered.
525 * @see EventListener#completedRegisterWithEventManager(EventManager)
526 */
527 public void completedRegisterWithEventManager(EventManager manager) { }
528
529 /**
530 * Does nothing.
531 *
532 * @param manager The {@code EventManager} with which this {@code EventListener} is now unregistered.
533 * @see EventListener#completedUnregisterWithEventManager(EventManager)
534 */
535 public void completedUnregisterWithEventManager(EventManager manager) { }
536
537 /**
538 * Does nothing.
539 *
540 * @param event An event in which this {@code EventListener} may be interested.
541 * @return {@code false}.
542 * @see EventListener#checkEventInterest(Event)
543 */
544 public boolean checkEventInterest(Event<? extends EventType> event) {
545 return false;
546 }
547
548 /**
549 * Validates (or not) settings event. When in "memory stream mode" this invoker will
550 * not valudate changing of cache mode (all-data/tail-only or cache/dont-cache) or changing
551 * the data file.
552 *
553 * @param event The event to be validated.
554 * @param soFar Whether {@code event} has been successfuly validated by whichever {@code EventListener}s
555 * (if any) were invoked to validate {@code event} before this {@code EventListener}.
556 *
557 * @return {@code false} if this invoker is in {@code memoryStreamMode} and if the event is of the types
558 * {@code [DFS_DataFile, DFS_DoNotCacheData, DFS_ShowOnlyTailData, DFS_Load]}.
559 *
560 * @see EventListener#checkEventValid(Event, boolean)
561 */
562 public boolean checkEventValid(Event<? extends EventType> event, boolean soFar) {
563
564 if (!memoryStreamMode)
565 return true;
566
567 EventType eventType = event.getType();
568 if (SettingsEvent.DFS_DataFile == eventType
569 || SettingsEvent.DFS_DoNotCacheData == eventType
570 || SettingsEvent.DFS_ShowOnlyTailData == eventType
571 || SettingsEvent.DFS_Load == eventType) {
572
573 synchronized(this) {
574 if (memoryStreamMode)
575 return false;
576 }
577 }
578
579 return true;
580 }
581
582 /**
583 * Processes events.
584 *
585 * @param event Event to process.
586 */
587 public void eventRaised(Event<? extends EventType> event) {
588
589 if (null == event)
590 return;
591
592 if (event.getDomain() == SettingsEvent.class) {
593 processSettingsEvent(event.cast(SettingsEvent.class));
594 }
595 }
596
597 /**
598 * When the application's settings change, this method is called in order
599 * to update the internal state accordingly.
600 *
601 * @param event Describes the change event.
602 */
603 private void processSettingsEvent(Event<SettingsEvent> event) {
604
605 switch(event.getType()) {
606
607 case DFS_DataFile:
608 synchronized(this) {
609 if (memoryStreamMode) break;
610 setDataFile((String) event.getInfoObject());
611 resetCache(null);
612 requestUpdate();
613 }
614 break;
615
616 case DFS_UpdateFrequency:
617 setInterval(event.getInfoLong());
618 break;
619
620 case DFS_DoNotCacheData:
621 synchronized(this) {
622 if (memoryStreamMode) break;
623 closeDataReader();
624 if (!event.getInfoBoolean())
625 resetCache(null);
626 }
627 break;
628
629 case DFS_ShowOnlyTailData:
630 synchronized(this) {
631 if (memoryStreamMode) break;
632 DataCache.CacheMode newMode = (event.getInfoBoolean()
633 ? DataCache.CacheMode.CacheTailData
634 : DataCache.CacheMode.CacheAllData);
635 DataCache.CacheMode oldMode = (dataCache.getCacheMode());
636
637 if (newMode != oldMode) {
638
639 closeDataReader();
640 resetCache(newMode);
641 requestUpdate();
642 }
643 }
644 break;
645
646 case DFS_Load:
647 synchronized(this) {
648 if (memoryStreamMode) break;
649 DataFileSettings dfs = LiveGraph.application().getDataFileSettings();
650 setInterval(dfs.getUpdateFrequency());
651 setDataFile(dfs.getDataFile());
652 resetCache(dfs.getShowOnlyTailData()
653 ? DataCache.CacheMode.CacheTailData
654 : DataCache.CacheMode.CacheAllData);
655 requestUpdate();
656 }
657 break;
658
659 default:
660 break;
661 }
662 }
663
664 /**
665 * Objects of this class do not handle {@code eventProcessingFinished} notifications.
666 *
667 * @param event Ignored.
668 */
669 public void eventProcessingFinished(Event<? extends EventType> event) { }
670
671 /**
672 * Objects of this class do not handle {@code eventProcessingException} notofications.
673 *
674 * @param event Ignored.
675 * @param exception Never actually thrown.
676 * @return {@code false}.
677 */
678 public boolean eventProcessingException(Event<? extends EventType> event, EventProcessingException exception) {
679 return false;
680 }
681
682 } // public class UpdateInvoker