From 531808eff7ac551bc9533d1f11a08333fd76c69e Mon Sep 17 00:00:00 2001
From: Greg DiCristofaro <gregd@basistech.com>
Date: Mon, 18 Oct 2021 21:04:14 -0400
Subject: [PATCH] pushing updates

---
 Core/ivy.xml                                  |  1 +
 Core/nbproject/project.properties             |  2 +
 Core/nbproject/project.xml                    |  4 +
 .../mainui/datamodel/DataArtifactDAO.java     | 71 ++++++++-------
 .../mainui/datamodel/ListenableCache.java     | 86 ++++++++++++++++++-
 5 files changed, 129 insertions(+), 35 deletions(-)

diff --git a/Core/ivy.xml b/Core/ivy.xml
index 15392cc30e..810bb5d411 100644
--- a/Core/ivy.xml
+++ b/Core/ivy.xml
@@ -59,6 +59,7 @@
 
         <!-- for MainUI event updates -->
         <dependency org="io.projectreactor" name="reactor-core" rev="3.4.11"/>
+        <dependency org="org.reactivestreams" name="reactive-streams" rev="1.0.3"/>
 
         <!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api -->
         <dependency conf="core->default" org="javax.ws.rs" name="javax.ws.rs-api" rev="2.0"/>
diff --git a/Core/nbproject/project.properties b/Core/nbproject/project.properties
index 5377cc3d82..eb3431bb0c 100644
--- a/Core/nbproject/project.properties
+++ b/Core/nbproject/project.properties
@@ -25,6 +25,8 @@ file.reference.commons-lang3-3.5.jar=release\\modules\\ext\\commons-lang3-3.5.ja
 file.reference.commons-logging-1.2.jar=release\\modules\\ext\\commons-logging-1.2.jar
 file.reference.commons-pool2-2.4.2.jar=release\\modules\\ext\\commons-pool2-2.4.2.jar
 file.reference.java-diff-utils-4.8.jar=release\\modules\\ext\\java-diff-utils-4.8.jar
+file.reference.reactor-core-3.4.11.jar=release\\modules\\ext\\reactor-core-3.4.11.jar
+file.reference.reactive-streams-1.0.3.jar=release\\modules\\ext\\reactive-streams-1.0.3.jar
 file.reference.commons-validator-1.6.jar=release\\modules\\ext\\commons-validator-1.6.jar
 file.reference.curator-client-2.8.0.jar=release\\modules\\ext\\curator-client-2.8.0.jar
 file.reference.curator-framework-2.8.0.jar=release\\modules\\ext\\curator-framework-2.8.0.jar
diff --git a/Core/nbproject/project.xml b/Core/nbproject/project.xml
index 24425552b5..00c1eab9b3 100644
--- a/Core/nbproject/project.xml
+++ b/Core/nbproject/project.xml
@@ -661,6 +661,10 @@
                 <runtime-relative-path>ext/reactor-core-3.4.11.jar</runtime-relative-path>
                 <binary-origin>release\modules\ext\reactor-core-3.4.11.jar</binary-origin>
             </class-path-extension>
+            <class-path-extension>
+                <runtime-relative-path>ext/reactive-streams-1.0.3.jar</runtime-relative-path>
+                <binary-origin>release\modules\ext\reactive-streams-1.0.3.jar</binary-origin>
+            </class-path-extension>
             <class-path-extension>
                 <runtime-relative-path>ext/SparseBitSet-1.1.jar</runtime-relative-path>
                 <binary-origin>release\modules\ext\SparseBitSet-1.1.jar</binary-origin>
diff --git a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java
index 83a0446092..e4597d8d6f 100644
--- a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java
+++ b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java
@@ -18,8 +18,6 @@
  */
 package org.sleuthkit.autopsy.mainui.datamodel;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -40,6 +38,7 @@
 import org.sleuthkit.datamodel.DataArtifact;
 import org.sleuthkit.datamodel.SleuthkitCase;
 import org.sleuthkit.datamodel.TskCoreException;
+import reactor.core.publisher.Flux;
 
 /**
  * DAO for providing data about data artifacts to populate the results viewer.
@@ -56,7 +55,33 @@ synchronized static DataArtifactDAO getInstance() {
         return instance;
     }
 
-    private final Cache<DataArtifactSearchParam, DataArtifactTableSearchResultsDTO> dataArtifactCache = CacheBuilder.newBuilder().maximumSize(1000).build();
+    private final ListenableCache<ModuleDataEvent, DataArtifactSearchParam, DataArtifactTableSearchResultsDTO> dataArtifactCache
+            = new ListenableCache<ModuleDataEvent, DataArtifactSearchParam, DataArtifactTableSearchResultsDTO>() {
+
+        @Override
+        protected DataArtifactTableSearchResultsDTO fetch(DataArtifactSearchParam key) throws Exception {
+            return DataArtifactDAO.this.fetchDataArtifactsForTable(key);
+        }
+
+        @Override
+        protected boolean matches(ModuleDataEvent eventData, DataArtifactSearchParam key) {
+            // GVDTODO handle
+            return key.getArtifactType().equals(eventData.getBlackboardArtifactType());
+        }
+
+        @Override
+        protected void validateCacheKey(DataArtifactSearchParam key) throws IllegalArgumentException {
+            BlackboardArtifact.Type artType = key.getArtifactType();
+
+            if (artType == null || artType.getCategory() != BlackboardArtifact.Category.DATA_ARTIFACT
+                    || (key.getDataSourceId() != null && key.getDataSourceId() < 0)) {
+                throw new IllegalArgumentException(MessageFormat.format("Illegal data.  "
+                        + "Artifact type must be non-null and data artifact.  Data source id must be null or > 0.  "
+                        + "Received artifact type: {0}; data source id: {1}", artType, key.getDataSourceId() == null ? "<null>" : key.getDataSourceId()));
+            }
+        }
+
+    };
 
     private DataArtifactTableSearchResultsDTO fetchDataArtifactsForTable(DataArtifactSearchParam cacheKey) throws NoCurrentCaseException, TskCoreException {
         SleuthkitCase skCase = getCase();
@@ -69,15 +94,15 @@ private DataArtifactTableSearchResultsDTO fetchDataArtifactsForTable(DataArtifac
         List<DataArtifact> arts = (dataSourceId != null)
                 ? blackboard.getDataArtifacts(artType.getTypeID(), dataSourceId)
                 : blackboard.getDataArtifacts(artType.getTypeID());
-        
+
         Stream<DataArtifact> pagedStream = arts.stream()
                 .sorted(Comparator.comparing(art -> art.getId()))
                 .skip(cacheKey.getStartItem());
-        
+
         if (cacheKey.getMaxResultsCount() != null) {
             pagedStream = pagedStream.limit(cacheKey.getMaxResultsCount());
         }
-        
+
         List<DataArtifact> pagedArtifacts = pagedStream.collect(Collectors.toList());
 
         Map<Long, Map<BlackboardAttribute.Type, Object>> artifactAttributes = new HashMap<>();
@@ -146,27 +171,18 @@ private DataArtifactTableSearchResultsDTO fetchDataArtifactsForTable(DataArtifac
         //return new DataArtifactTableSearchResultsDTO(artType, columnKeys, rows, cacheKey.getStartItem(), arts.size());
     }
 
-
-
     public DataArtifactTableSearchResultsDTO getDataArtifactsForTable(DataArtifactSearchParam artifactKey) throws ExecutionException, IllegalArgumentException {
-        BlackboardArtifact.Type artType = artifactKey.getArtifactType();
-
-        if (artType == null || artType.getCategory() != BlackboardArtifact.Category.DATA_ARTIFACT
-                || (artifactKey.getDataSourceId() != null && artifactKey.getDataSourceId() < 0)) {
-            throw new IllegalArgumentException(MessageFormat.format("Illegal data.  "
-                    + "Artifact type must be non-null and data artifact.  Data source id must be null or > 0.  "
-                    + "Received artifact type: {0}; data source id: {1}", artType, artifactKey.getDataSourceId() == null ? "<null>" : artifactKey.getDataSourceId()));
-        }
-
-        return dataArtifactCache.get(artifactKey, () -> fetchDataArtifactsForTable(artifactKey));
+        return this.dataArtifactCache.getValue(artifactKey);
+    }
+    
+    public Flux<DataArtifactTableSearchResultsDTO> getDataArtifactUpdates(DataArtifactSearchParam artifactKey) throws ExecutionException, IllegalArgumentException {
+        return this.dataArtifactCache.getInitialAndUpdates(artifactKey);
     }
 
     public void dropDataArtifactCache() {
-        dataArtifactCache.invalidateAll();
+        this.dataArtifactCache.invalidateAll();
     }
 
-    
-    
     @Override
     public void onDropCache() {
         dropDataArtifactCache();
@@ -174,17 +190,6 @@ public void onDropCache() {
 
     @Override
     public void onModuleData(ModuleDataEvent evt) {
-        if (evt == null || evt.getBlackboardArtifactType() == null) {
-            return;
-        }
-        
-        // GVDTODO data source filtering?
-        
-        final int artifactTypeId = evt.getBlackboardArtifactType().getTypeID();
-        this.dataArtifactCache.asMap().replaceAll((k,v) -> {
-            return (k == null || k.getArtifactType() == null || artifactTypeId != k.getArtifactType().getTypeID())
-                    ? null 
-                    : v;
-        });
+        this.dataArtifactCache.invalidate(evt);
     }
 }
diff --git a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java
index 91ba5b1086..3bd50266f8 100644
--- a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java
+++ b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java
@@ -18,9 +18,91 @@
  */
 package org.sleuthkit.autopsy.mainui.datamodel;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.sleuthkit.autopsy.coreutils.Logger;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.EmitResult;
+import reactor.util.concurrent.Queues;
+
 /**
  *
  */
-public class ListenableCache<K,V> {
-    private final Cache<K,V> cache
+public abstract class ListenableCache<D, K, V> {
+
+    private static final Logger logger = Logger.getLogger(ListenableCache.class.getName());
+
+    private final Cache<K, V> cache = CacheBuilder.newBuilder().maximumSize(1000).build();
+
+    // taken from https://stackoverflow.com/questions/66671636/why-is-sinks-many-multicast-onbackpressurebuffer-completing-after-one-of-t
+    private final Sinks.Many<K> invalidatedKeyMulticast = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
+
+    public V getValue(K key) throws IllegalArgumentException, ExecutionException {
+        validateCacheKey(key);
+        return cache.get(key, () -> fetch(key));
+    }
+
+    private V getValueLoggedError(K key) {
+        try {
+            return getValue(key);
+        } catch (IllegalArgumentException | ExecutionException ex) {
+            logger.log(Level.WARNING, "An error occurred while fetching results for key: " + key, ex);
+            return null;
+        }
+    }
+
+    public Flux<V> getInitialAndUpdates(K key) throws IllegalArgumentException {
+        validateCacheKey(key);
+        
+        // GVDTODO handle in one transaction
+        Flux<V> initial = Flux.fromStream(Stream.of(getValueLoggedError(key)));
+
+        Flux<V> updates = this.invalidatedKeyMulticast.asFlux()
+                .filter(invalidatedKey -> key.equals(invalidatedKey))
+                .map((matchingInvalidatedKey) -> getValueLoggedError(matchingInvalidatedKey));
+
+        return Flux.concat(initial, updates)
+                .filter((data) -> data != null);
+    }
+
+    public void invalidateAll() {
+        List<K> keys = new ArrayList<>(cache.asMap().keySet());
+        invalidateAndBroadcast(keys);
+    }
+
+    public void invalidate(D eventData) {
+        List<K> keys = cache.asMap().keySet().stream()
+                .filter((key) -> matches(eventData, key))
+                .collect(Collectors.toList());
+        invalidateAndBroadcast(keys);
+    }
+
+    private void invalidateAndBroadcast(Iterable<K> keys) {
+        cache.invalidateAll(keys);
+        keys.forEach((k) -> {
+            EmitResult emitResult = invalidatedKeyMulticast.tryEmitNext(k);
+            if (emitResult.isFailure()) {
+                logger.log(Level.WARNING, MessageFormat.format("There was an error broadcasting invalidated key {0}: {1}", k, emitResult.name()));
+            }
+        });
+    }
+
+    protected void validateCacheKey(K key) throws IllegalArgumentException {
+        // to be overridden
+        if (key == null) {
+            throw new IllegalArgumentException("Expected non-null key");
+        }
+    }
+
+    protected abstract V fetch(K key) throws Exception;
+
+    protected abstract boolean matches(D eventData, K key);
 }
-- 
GitLab