diff --git a/Core/ivy.xml b/Core/ivy.xml index 15392cc30e43c063e936128cfeffb8fd7184506f..810bb5d411f47fb78deec0b24652e193cfabc48e 100644 --- a/Core/ivy.xml +++ b/Core/ivy.xml @@ -59,6 +59,7 @@ <!-- for MainUI event updates --> <dependency org="io.projectreactor" name="reactor-core" rev="3.4.11"/> + <dependency org="org.reactivestreams" name="reactive-streams" rev="1.0.3"/> <!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api --> <dependency conf="core->default" org="javax.ws.rs" name="javax.ws.rs-api" rev="2.0"/> diff --git a/Core/nbproject/project.properties b/Core/nbproject/project.properties index 5377cc3d82558d20a4dba7c13564fd9023cb5ee5..eb3431bb0cc3acf4143676e860cf80fe9f54dd41 100644 --- a/Core/nbproject/project.properties +++ b/Core/nbproject/project.properties @@ -25,6 +25,8 @@ file.reference.commons-lang3-3.5.jar=release\\modules\\ext\\commons-lang3-3.5.ja file.reference.commons-logging-1.2.jar=release\\modules\\ext\\commons-logging-1.2.jar file.reference.commons-pool2-2.4.2.jar=release\\modules\\ext\\commons-pool2-2.4.2.jar file.reference.java-diff-utils-4.8.jar=release\\modules\\ext\\java-diff-utils-4.8.jar +file.reference.reactor-core-3.4.11.jar=release\\modules\\ext\\reactor-core-3.4.11.jar +file.reference.reactive-streams-1.0.3.jar=release\\modules\\ext\\reactive-streams-1.0.3.jar file.reference.commons-validator-1.6.jar=release\\modules\\ext\\commons-validator-1.6.jar file.reference.curator-client-2.8.0.jar=release\\modules\\ext\\curator-client-2.8.0.jar file.reference.curator-framework-2.8.0.jar=release\\modules\\ext\\curator-framework-2.8.0.jar diff --git a/Core/nbproject/project.xml b/Core/nbproject/project.xml index 24425552b5fd628ff191ce4d332377ce96929b57..00c1eab9b3deaad9fd47d0ba09e39d20401883d7 100644 --- a/Core/nbproject/project.xml +++ b/Core/nbproject/project.xml @@ -661,6 +661,10 @@ <runtime-relative-path>ext/reactor-core-3.4.11.jar</runtime-relative-path> <binary-origin>release\modules\ext\reactor-core-3.4.11.jar</binary-origin> </class-path-extension> + <class-path-extension> + <runtime-relative-path>ext/reactive-streams-1.0.3.jar</runtime-relative-path> + <binary-origin>release\modules\ext\reactive-streams-1.0.3.jar</binary-origin> + </class-path-extension> <class-path-extension> <runtime-relative-path>ext/SparseBitSet-1.1.jar</runtime-relative-path> <binary-origin>release\modules\ext\SparseBitSet-1.1.jar</binary-origin> diff --git a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java index 83a04460928ca37334ef44ef1f3a3625457a3e78..e4597d8d6fdfba609cf97c1f5d1cec416c2c0c87 100644 --- a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java +++ b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/DataArtifactDAO.java @@ -18,8 +18,6 @@ */ package org.sleuthkit.autopsy.mainui.datamodel; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; @@ -40,6 +38,7 @@ import org.sleuthkit.datamodel.DataArtifact; import org.sleuthkit.datamodel.SleuthkitCase; import org.sleuthkit.datamodel.TskCoreException; +import reactor.core.publisher.Flux; /** * DAO for providing data about data artifacts to populate the results viewer. @@ -56,7 +55,33 @@ synchronized static DataArtifactDAO getInstance() { return instance; } - private final Cache<DataArtifactSearchParam, DataArtifactTableSearchResultsDTO> dataArtifactCache = CacheBuilder.newBuilder().maximumSize(1000).build(); + private final ListenableCache<ModuleDataEvent, DataArtifactSearchParam, DataArtifactTableSearchResultsDTO> dataArtifactCache + = new ListenableCache<ModuleDataEvent, DataArtifactSearchParam, DataArtifactTableSearchResultsDTO>() { + + @Override + protected DataArtifactTableSearchResultsDTO fetch(DataArtifactSearchParam key) throws Exception { + return DataArtifactDAO.this.fetchDataArtifactsForTable(key); + } + + @Override + protected boolean matches(ModuleDataEvent eventData, DataArtifactSearchParam key) { + // GVDTODO handle + return key.getArtifactType().equals(eventData.getBlackboardArtifactType()); + } + + @Override + protected void validateCacheKey(DataArtifactSearchParam key) throws IllegalArgumentException { + BlackboardArtifact.Type artType = key.getArtifactType(); + + if (artType == null || artType.getCategory() != BlackboardArtifact.Category.DATA_ARTIFACT + || (key.getDataSourceId() != null && key.getDataSourceId() < 0)) { + throw new IllegalArgumentException(MessageFormat.format("Illegal data. " + + "Artifact type must be non-null and data artifact. Data source id must be null or > 0. " + + "Received artifact type: {0}; data source id: {1}", artType, key.getDataSourceId() == null ? "<null>" : key.getDataSourceId())); + } + } + + }; private DataArtifactTableSearchResultsDTO fetchDataArtifactsForTable(DataArtifactSearchParam cacheKey) throws NoCurrentCaseException, TskCoreException { SleuthkitCase skCase = getCase(); @@ -69,15 +94,15 @@ private DataArtifactTableSearchResultsDTO fetchDataArtifactsForTable(DataArtifac List<DataArtifact> arts = (dataSourceId != null) ? blackboard.getDataArtifacts(artType.getTypeID(), dataSourceId) : blackboard.getDataArtifacts(artType.getTypeID()); - + Stream<DataArtifact> pagedStream = arts.stream() .sorted(Comparator.comparing(art -> art.getId())) .skip(cacheKey.getStartItem()); - + if (cacheKey.getMaxResultsCount() != null) { pagedStream = pagedStream.limit(cacheKey.getMaxResultsCount()); } - + List<DataArtifact> pagedArtifacts = pagedStream.collect(Collectors.toList()); Map<Long, Map<BlackboardAttribute.Type, Object>> artifactAttributes = new HashMap<>(); @@ -146,27 +171,18 @@ private DataArtifactTableSearchResultsDTO fetchDataArtifactsForTable(DataArtifac //return new DataArtifactTableSearchResultsDTO(artType, columnKeys, rows, cacheKey.getStartItem(), arts.size()); } - - public DataArtifactTableSearchResultsDTO getDataArtifactsForTable(DataArtifactSearchParam artifactKey) throws ExecutionException, IllegalArgumentException { - BlackboardArtifact.Type artType = artifactKey.getArtifactType(); - - if (artType == null || artType.getCategory() != BlackboardArtifact.Category.DATA_ARTIFACT - || (artifactKey.getDataSourceId() != null && artifactKey.getDataSourceId() < 0)) { - throw new IllegalArgumentException(MessageFormat.format("Illegal data. " - + "Artifact type must be non-null and data artifact. Data source id must be null or > 0. " - + "Received artifact type: {0}; data source id: {1}", artType, artifactKey.getDataSourceId() == null ? "<null>" : artifactKey.getDataSourceId())); - } - - return dataArtifactCache.get(artifactKey, () -> fetchDataArtifactsForTable(artifactKey)); + return this.dataArtifactCache.getValue(artifactKey); + } + + public Flux<DataArtifactTableSearchResultsDTO> getDataArtifactUpdates(DataArtifactSearchParam artifactKey) throws ExecutionException, IllegalArgumentException { + return this.dataArtifactCache.getInitialAndUpdates(artifactKey); } public void dropDataArtifactCache() { - dataArtifactCache.invalidateAll(); + this.dataArtifactCache.invalidateAll(); } - - @Override public void onDropCache() { dropDataArtifactCache(); @@ -174,17 +190,6 @@ public void onDropCache() { @Override public void onModuleData(ModuleDataEvent evt) { - if (evt == null || evt.getBlackboardArtifactType() == null) { - return; - } - - // GVDTODO data source filtering? - - final int artifactTypeId = evt.getBlackboardArtifactType().getTypeID(); - this.dataArtifactCache.asMap().replaceAll((k,v) -> { - return (k == null || k.getArtifactType() == null || artifactTypeId != k.getArtifactType().getTypeID()) - ? null - : v; - }); + this.dataArtifactCache.invalidate(evt); } } diff --git a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java index 91ba5b10864fe12eed1c491959cd63279e9ee07f..3bd50266f842009fa95bd57b10958b387fcbb016 100644 --- a/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java +++ b/Core/src/org/sleuthkit/autopsy/mainui/datamodel/ListenableCache.java @@ -18,9 +18,91 @@ */ package org.sleuthkit.autopsy.mainui.datamodel; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.logging.Level; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.sleuthkit.autopsy.coreutils.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitResult; +import reactor.util.concurrent.Queues; + /** * */ -public class ListenableCache<K,V> { - private final Cache<K,V> cache +public abstract class ListenableCache<D, K, V> { + + private static final Logger logger = Logger.getLogger(ListenableCache.class.getName()); + + private final Cache<K, V> cache = CacheBuilder.newBuilder().maximumSize(1000).build(); + + // taken from https://stackoverflow.com/questions/66671636/why-is-sinks-many-multicast-onbackpressurebuffer-completing-after-one-of-t + private final Sinks.Many<K> invalidatedKeyMulticast = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); + + public V getValue(K key) throws IllegalArgumentException, ExecutionException { + validateCacheKey(key); + return cache.get(key, () -> fetch(key)); + } + + private V getValueLoggedError(K key) { + try { + return getValue(key); + } catch (IllegalArgumentException | ExecutionException ex) { + logger.log(Level.WARNING, "An error occurred while fetching results for key: " + key, ex); + return null; + } + } + + public Flux<V> getInitialAndUpdates(K key) throws IllegalArgumentException { + validateCacheKey(key); + + // GVDTODO handle in one transaction + Flux<V> initial = Flux.fromStream(Stream.of(getValueLoggedError(key))); + + Flux<V> updates = this.invalidatedKeyMulticast.asFlux() + .filter(invalidatedKey -> key.equals(invalidatedKey)) + .map((matchingInvalidatedKey) -> getValueLoggedError(matchingInvalidatedKey)); + + return Flux.concat(initial, updates) + .filter((data) -> data != null); + } + + public void invalidateAll() { + List<K> keys = new ArrayList<>(cache.asMap().keySet()); + invalidateAndBroadcast(keys); + } + + public void invalidate(D eventData) { + List<K> keys = cache.asMap().keySet().stream() + .filter((key) -> matches(eventData, key)) + .collect(Collectors.toList()); + invalidateAndBroadcast(keys); + } + + private void invalidateAndBroadcast(Iterable<K> keys) { + cache.invalidateAll(keys); + keys.forEach((k) -> { + EmitResult emitResult = invalidatedKeyMulticast.tryEmitNext(k); + if (emitResult.isFailure()) { + logger.log(Level.WARNING, MessageFormat.format("There was an error broadcasting invalidated key {0}: {1}", k, emitResult.name())); + } + }); + } + + protected void validateCacheKey(K key) throws IllegalArgumentException { + // to be overridden + if (key == null) { + throw new IllegalArgumentException("Expected non-null key"); + } + } + + protected abstract V fetch(K key) throws Exception; + + protected abstract boolean matches(D eventData, K key); }