diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java index 84da5c53a812e4454fdb9e69edf01d7eed3962ab..44b28c415afb6fc26282ace0d4220b3b2b2da9d4 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java @@ -365,12 +365,12 @@ private void startBatchModeAnalysis() { * thread executing an ingest task, so such a job would run forever, * doing nothing, without a check here. */ - checkForTierCompleted(); + checkForTierCompleted(moduleTierIndex); } } /** - * Estimates the files to be prcessed in the current tier. + * Estimates the files to be processed in the current tier. */ private void estimateFilesToProcess() { estimatedFilesToProcess = 0; @@ -487,7 +487,7 @@ void addStreamedDataSource() { * job, so such a job would run forever, doing nothing, without * a check here. */ - checkForTierCompleted(); + checkForTierCompleted(moduleTierIndex); } } } @@ -497,11 +497,16 @@ void addStreamedDataSource() { * module tier are completed, and does an appropriate state transition if * they are. */ - private void checkForTierCompleted() { + private void checkForTierCompleted(int currentTier) { synchronized (tierTransitionLock) { if (jobState.equals(IngestJobState.ACCEPTING_STREAMED_CONTENT_AND_ANALYZING)) { return; } + if (currentTier < moduleTierIndex) { + // We likely had a leftover task from the previous tier. Since we've already + // advanced to the next tier, ignore it. + return; + } if (taskScheduler.currentTasksAreCompleted(getIngestJobId())) { do { shutDownCurrentTier(); @@ -512,7 +517,7 @@ private void checkForTierCompleted() { shutDown(); break; } - } while (taskScheduler.currentTasksAreCompleted(getIngestJobId())); + } while (taskScheduler.currentTasksAreCompleted(getIngestJobId())); // Loop again immediately in case the new tier is empty } } } @@ -566,8 +571,11 @@ void execute(DataSourceIngestTask task) { } } } finally { + // Save the module tier assocaited with this task since it could change after + // notifyTaskComplete + int currentTier = moduleTierIndex; taskScheduler.notifyTaskCompleted(task); - checkForTierCompleted(); + checkForTierCompleted(currentTier); } } @@ -619,8 +627,11 @@ void execute(FileIngestTask task) { logger.log(Level.SEVERE, String.format("File ingest thread interrupted during execution of file ingest job (file object ID = %d, thread ID = %d)", task.getFileId(), task.getThreadId()), ex); Thread.currentThread().interrupt(); } finally { + // Save the module tier assocaited with this task since it could change after + // notifyTaskComplete + int currentTier = moduleTierIndex; taskScheduler.notifyTaskCompleted(task); - checkForTierCompleted(); + checkForTierCompleted(currentTier); } } @@ -644,13 +655,16 @@ void execute(DataArtifactIngestTask task) { } } } finally { + // Save the module tier assocaited with this task since it could change after + // notifyTaskComplete + int currentTier = moduleTierIndex; taskScheduler.notifyTaskCompleted(task); - checkForTierCompleted(); + checkForTierCompleted(currentTier); } } /** - * Passes an analyisis result from the data source for the ingest job + * Passes an analysis result from the data source for the ingest job * through the analysis result ingest module pipeline. * * @param task An analysis result ingest task encapsulating the analysis @@ -669,8 +683,11 @@ void execute(AnalysisResultIngestTask task) { } } } finally { + // Save the module tier assocaited with this task since it could change after + // notifyTaskComplete + int currentTier = moduleTierIndex; taskScheduler.notifyTaskCompleted(task); - checkForTierCompleted(); + checkForTierCompleted(currentTier); } } @@ -793,6 +810,12 @@ void addAnalysisResults(List<AnalysisResult> results) { * Shuts down the ingest module pipelines in the current module tier. */ private void shutDownCurrentTier() { + // Note that this method is only called while holding the tierTransitionLock, so moduleTierIndex can not change + // during execution. + if (moduleTierIndex >= ingestModuleTiers.size()) { + logErrorMessage(Level.SEVERE, "shutDownCurrentTier called with out-of-bounds moduleTierIndex (" + moduleTierIndex + ")"); + return; + } logInfoMessage(String.format("Finished all ingest tasks for tier %s of ingest job", moduleTierIndex)); //NON-NLS jobState = IngestJobExecutor.IngestJobState.PIPELINES_SHUTTING_DOWN; IngestModuleTier moduleTier = ingestModuleTiers.get(moduleTierIndex); @@ -861,7 +884,7 @@ private void shutDown() { * @return The currently running module, may be null. */ DataSourceIngestPipeline.DataSourcePipelineModule getCurrentDataSourceIngestModule() { - Optional<DataSourceIngestPipeline> pipeline = ingestModuleTiers.get(moduleTierIndex).getDataSourceIngestPipeline(); + Optional<DataSourceIngestPipeline> pipeline = getCurrentDataSourceIngestPipelines(); if (pipeline.isPresent()) { return (DataSourceIngestPipeline.DataSourcePipelineModule) pipeline.get().getCurrentlyRunningModule(); } else { @@ -963,7 +986,7 @@ void cancel(IngestJob.CancellationReason reason) { } pausedIngestThreads.clear(); } - checkForTierCompleted(); + checkForTierCompleted(moduleTierIndex); } /** @@ -1407,6 +1430,35 @@ private void logIngestModuleErrors(List<IngestModuleError> errors, AbstractFile logErrorMessage(Level.SEVERE, String.format("%s experienced an error during analysis while processing file %s (object ID = %d)", error.getModuleDisplayName(), file.getName(), file.getId()), error.getThrowable()); //NON-NLS } } + + /** + * Safely gets the file ingest pipelines for the current tier. + * + * @return The file ingest pipelines or empty if ingest has completed/is shutting down. + */ + Optional<List<FileIngestPipeline>> getCurrentFileIngestPipelines() { + // Make a local copy in case the tier increments + int currentModuleTierIndex = moduleTierIndex; + if (currentModuleTierIndex < ingestModuleTiers.size()) { + return Optional.of(ingestModuleTiers.get(currentModuleTierIndex).getFileIngestPipelines()); + } + return Optional.empty(); + } + + /** + * Safely gets the data source ingest pipeline for the current tier. + * + * @return The data source ingest pipeline or empty if ingest has completed/is shutting down. + */ + Optional<DataSourceIngestPipeline> getCurrentDataSourceIngestPipelines() { + // Make a local copy in case the tier increments + int currentModuleTierIndex = moduleTierIndex; + if (currentModuleTierIndex < ingestModuleTiers.size()) { + return ingestModuleTiers.get(currentModuleTierIndex).getDataSourceIngestPipeline(); + } + return Optional.empty(); + } + /** * Gets a snapshot of some basic diagnostic statistics for the ingest job @@ -1431,7 +1483,12 @@ IngestJobProgressSnapshot getIngestJobProgressSnapshot(boolean includeIngestTask */ boolean fileIngestRunning = false; Date fileIngestStartTime = null; - for (FileIngestPipeline pipeline : ingestModuleTiers.get(moduleTierIndex).getFileIngestPipelines()) { + Optional<List<FileIngestPipeline>> fileIngestPipelines = getCurrentFileIngestPipelines(); + if (!fileIngestPipelines.isPresent()) { + // If there are no currently running pipelines, use the original set. + fileIngestPipelines = Optional.of(ingestModuleTiers.get(0).getFileIngestPipelines()); + } + for (FileIngestPipeline pipeline : fileIngestPipelines.get()) { if (pipeline.isRunning()) { fileIngestRunning = true; } diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java index 7fbc80a2a2acb58358053876b6f43f1bccc3d774..d4f789a8619996480583c6a88d3fcaed3e469659 100644 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java @@ -2814,10 +2814,13 @@ private void analyze(AutoIngestDataSource dataSource) throws AnalysisStartupExce /* * Block until notified by the ingest job event listener * or until interrupted because auto ingest is shutting - * down. + * down. For very small jobs, it is possible that ingest has + * completed by the time we get here, so check periodically + * in case the event was missed. */ - ingestLock.wait(); - sysLogger.log(Level.INFO, "Finished ingest modules analysis for {0} ", manifestPath); + while (IngestManager.getInstance().isIngestRunning()) { + ingestLock.wait(300000); // Check every five minutes + } IngestJob.ProgressSnapshot jobSnapshot = ingestJob.getSnapshot(); IngestJob.ProgressSnapshot.DataSourceProcessingSnapshot snapshot = jobSnapshot.getDataSourceProcessingSnapshot(); AutoIngestJobLogger nestedJobLogger = new AutoIngestJobLogger(manifestPath, snapshot.getDataSource(), caseDirectoryPath);