diff --git a/experiments/towards_array4.py b/experiments/towards_array4.py index 799720cd922911353f73a0c48846c8c7d0ca695d..1fd197f153da1d689e9e27fb6d3d452dae6941cc 100644 --- a/experiments/towards_array4.py +++ b/experiments/towards_array4.py @@ -3,7 +3,7 @@ import math from collections.abc import Generator, Iterable from dataclasses import dataclass from itertools import cycle -from typing import Any, Protocol +from typing import Any, Protocol, NewType import matplotlib.pyplot as plt import numpy as np @@ -24,6 +24,11 @@ from sensor import get_beamformer, get_sensor from simulation.movement import line_movement, radius_to_snr from simulation.signal_generation import color_signal + +from trackers.dbtk.setup import ( + into_stonesoup_detector, +) + from .filters import DetectionPFTracker, FilterSolution, Gaussian relabel = { @@ -38,6 +43,57 @@ strategy. This is because every set of sampling strategy requires a new set of parameters. """ +# A hack to solve this question quickly: +# Use preprocessing from some other tracker, and use the tracking +# from the detection tracker. Needs a custom preprocessing step. +Time = float +Bearing = float +DetectionRaw = tuple[Time, Bearing] +Detections = list[DetectionRaw] +Prob = float +MapBearing = NewType("MapBearing", float) +MeanBearing = NewType("MeanBearing", float) +CovBearing = NewType("CovBearing", float) +ProposedTrack = list[tuple[Time, Prob, MapBearing, MeanBearing, CovBearing]] +@dataclass +class WhiteningDetectionPFTracker(FilterSolution): + + detection_tracker: DetectionPFTracker + whitening_tracker: Gaussian + label: str = "CFARVAR14" + + def preprocess(self, samples: Iterable[np.ndarray], sampling_frequency: float, demodulation_frequency: float) -> Iterable[tuple[float, list[float]]]: + + energy_and_pva = self.whitening_tracker.preprocess(samples, sampling_frequency, demodulation_frequency) + + btr = np.array([p for (_, p) in energy_and_pva]) + + detector = into_stonesoup_detector( + self.detection_tracker.detector, self.detection_tracker.beamformer.tsamp, self.detection_tracker.measurement_model + ) + + detections = detector(np.flip(btr, axis=1)) + detections = [ + ( + k * self.beamformer.tsamp, + [np.array(vv.state_vector).flatten() for vv in detections_at_time], + ) + for (detections_at_time, k) in zip(detections, itertools.count()) + ] + # time_axis = [k * self.beamformer.tsamp for (k, _) in enumerate(detections) ] + return detections + + def track(self, input) -> list[ProposedTrack]: + + return self.detection_tracker.track(input) + + def collect_data_mult(self, track: list[ProposedTrack]): + return self.detection_tracker.collect_data_mult(track) + + @property + def beamformer(self): + return self.whitening_tracker.beamformer + class FiltersAndSamples(Protocol): def filters(self, setup: "SetupData") -> list[FilterSolution]: ... @@ -98,6 +154,12 @@ class TargetSigColBackgroundIsTDist(FiltersAndSamples): lam = 0.0022 # 0.75* # 0.73 * 1.5 # 0.73 * 1.7 # 0.73 * 1.5 # 0.73 * 2.0* self.DBTK_P = DetectionPFTracker.from_lambda(lam) + self.DBTK_P_whitened = WhiteningDetectionPFTracker( + detection_tracker = DetectionPFTracker.from_lambda(lam), # TODO: retune, + whitening_tracker = self.GTkBD # Only used for whitening. + ) + + # return [self.DBTK_P, self.DBTK_P_whitened] return [self.GTkBD, self.DBTK_P, self.TkBDAr0, self.TkBDAr] def samples(self, experiment: "Experiment"): @@ -924,6 +986,10 @@ def main(block: bool = False): p = Plot(exp) + exp.run() + p.plot_results() + + exit() exp.run_mc(100) p.plot_mc_results(block=block)