diff --git a/src/trackers.py b/src/trackers.py index 84bfc61af844f93fb88b7c4dcb1361225a3d1ccd..555e14b4ce30920a7d6803a09a579071a2a92057 100644 --- a/src/trackers.py +++ b/src/trackers.py @@ -1,19 +1,58 @@ -import pdb +"""Implements a number of different trackers.""" +import copy import numpy as np import scipy.stats as stats import scipy import tqdm from pyehm.core import EHM2 +from .utility import murty class BasicTracker(): def __init__(self, filt, clutter_model, associator, gater): + """A basic single target tracker. + + Parameters + ---------- + filt : filter + See src.filter. Some sort of filter to use for the tracks. + associator : associators + See src.associators. Some sort of measurement associator (NN). + gater : gater + See src.gater. A gating function. + clutter_model : dict + A dict containing the clutter model. + + """ self.filt = filt self.clutter_model = clutter_model self.associator = associator self.gater = gater def evaluate(self, Y, x, P): - for k, meas_k in enumerate(Y): + """ Evaluates the measurements in Y. + + nx -- state dimension + K -- number of measurements + + Parameters + ---------- + Y : list + List of detections at time k=0 to K where K is the length of Y. + Each entry of Y is ny by N_k where N_k is time-varying as the number + of detections vary. + x : numpy.ndarray (nx x K) + Array to save the state estimates in. + P : numpy.ndarray (nx x nx x K) + Array to save the state error covariances in. + + Returns + ------- + numpy.ndarray (nx x K) + State estimates per time. + numpy.ndarray (nx x nx x K) + State error covariance per time. + """ + for k, meas_k in tqdm.tqdm(enumerate(Y), desc="Evaluating measurements: ", total=len(Y)): # Calculate prediction error of each measurement yhat = self.filt.sensor_model['h'](x[:2, k]) eps = meas_k-yhat[:, None] @@ -32,15 +71,52 @@ class BasicTracker(): class IMMTracker(): def __init__(self, filt, clutter_model, associator, gater): + """A single target tracker using an IMM as filter. + + Parameters + ---------- + filt : filter + See src.filter. Some sort of filter to use for the tracks. + associator : associators + See src.associators. Some sort of measurement associator (NN). + gater : gater + See src.gater. A gating function. + clutter_model : dict + A dict containing the clutter model. + + """ self.filt = filt self.clutter_model = clutter_model self.associator = associator self.gater = gater def evaluate(self, Y, x, P): + """ Evaluates the measurements in Y. + + nx -- state dimension + K -- number of measurements + + Parameters + ---------- + Y : list + List of detections at time k=0 to K where K is the length of Y. + Each entry of Y is ny by N_k where N_k is time-varying as the number + of detections vary. + x : numpy.ndarray (nx x K) + Array to save the state estimates in. + P : numpy.ndarray (nx x nx x K) + Array to save the state error covariances in. + + Returns + ------- + numpy.ndarray (nx x K x nmodes) + State estimates per time per mode. + numpy.ndarray (nx x nx x K x nmodes) + State error covariance per time per mode. + """ xm = np.repeat(np.expand_dims(x, -1), self.filt.nmodes, axis=-1) Pm = np.repeat(np.expand_dims(P, -1), self.filt.nmodes, axis=-1) - for k, meas_k in enumerate(Y): + for k, meas_k in tqdm.tqdm(enumerate(Y), desc="IMM evaluating measurements: ", total=len(Y)): # Get the mixed estimate xhat, Phat = self.filt.mix(xm[:, k, :], Pm[:, :, k, :]) # Calculate prediction error of each measurement @@ -181,7 +257,7 @@ class GNN(): tracks = [] # Store all tracks confirmed_tracks = [] # Store the confirmed tracks (for plotting purposes only) ids = 0 - for k, meas_k in tqdm.tqdm(enumerate(Y), desc="GNN evaluating detections: "): + for k, meas_k in tqdm.tqdm(enumerate(Y), desc="GNN evaluating detections: ", total=len(Y)): ny = meas_k.shape[1] unused_meas = np.ones((ny), dtype=bool) @@ -359,7 +435,7 @@ class JPDA(): tracks = [] # Store all tracks confirmed_tracks = [] # Store the confirmed tracks (for plotting purposes only) ids = 0 - for k, meas_k in tqdm.tqdm(enumerate(Y), desc="JPDA evaluating detections: "): + for k, meas_k in tqdm.tqdm(enumerate(Y), desc="JPDA evaluating detections: ", total=len(Y)): ny = meas_k.shape[1] unused_meas = np.ones((ny), dtype=bool) @@ -393,6 +469,256 @@ class JPDA(): track['t'].append(k+1) return tracks, confirmed_tracks +class MHT(): + def __init__(self, logic, logic_params, init_track, filt, gater, clutter_model, pthresh=0.95, Nh=30): + """An implementation of a Hypothesis Oriented Multiple Hypothesis Tracker. + + Parameters + ---------- + logic : logic + See src.logic. Some sort of track logic. + logic_params : dict + Contains parameters to the track logic. + init_track : callable + A function that initiates a track. Should take a measurement, the + time, an id and the filter to use for the track as input. + filt : filter + See src.filter. Some sort of filter to use for the tracks. + gater : gater + See src.gater. A gating function. + clutter_model : dict + A dict containing the clutter model. + pthresh : float [0, 1 + Threshold for the amount of probability "mass" to keep each timestep. Prunes unlikely hypothesis until this threshold is reached + Nh : int + Maximum allowed number of hypothesis after each time step + + """ + self.logic = logic + self.logic_params = logic_params + self.init_track = init_track + self.filt = filt + self.gater = gater + self.clutter_model = clutter_model + self.pthresh = pthresh + self.Nh = Nh + + def get_association_cost(self, meas, tracks, validation_matrix): + """Calculates an association and likelihood matrix. + + The association matrix (ny x ny+Nc) contains the cost of assigning a measurement to a specific track (or to false/new target). + The likelihood matrix (ny x ny+Nc) contains the likelihood of a specific association. + Hence, they are very related but both are provided for convenience later on. + + ny -- number of measurements + Nc -- number of tracks + + Parameters + ---------- + meas : numpy.ndarray (: x ny) + Contains measurement(s) to calculate the cost/likelihood for + tracks : list + A list containing the tracks to calculate the cost/likelihood for + validation_matrix : numpy.ndarray (ny x Nc) + + Returns + ---------- + numpy.ndarray (ny x ny+Nc) + An array of the cost of associating a measurement to a specific track + numpy.ndarray (ny x ny+Nc) + An array of the likelihood of each association + + """ + ny = meas.shape[1] + Nc = len(tracks) # Number of tracks to associate + association_matrix = -np.inf*np.ones((ny, Nc+ny)) + likelihood_matrix = np.zeros((ny, Nc+ny)) + # Entry for false alarms + np.fill_diagonal(association_matrix[:, Nc:Nc+ny], np.log(self.logic_params['Bfa'])) + np.fill_diagonal(likelihood_matrix[:, Nc:Nc+ny], (1-self.logic_params['PD']*self.logic_params['PG'])*self.logic_params['Bfa']) + + if tracks: + # All of the tracks are assumed to use the same sensor model! + x = np.vstack([track['x'][-1] for track in tracks]).T + yhat_t = tracks[0]['filt'].sensor_model['h'](x) # Returns a (ny x nx) matrix + H_t = tracks[0]['filt'].sensor_model['dhdx'](x) # Returns a (ny x nC x nx x nC) tensor + for ti, track in enumerate(tracks): # Iterate over confirmed tracks + # Entry for validated tracks + if validation_matrix[:, ti].any(): # If any measurements are validated + val_meas = meas[:, validation_matrix[:, ti]] # Get the validated measurements for this track + yhat = yhat_t[:, ti] + H = H_t[:, ti, :, ti] + py = stats.multivariate_normal.pdf(val_meas.squeeze().T, mean=yhat.flatten(), cov=H@track['P'][-1]@H.T+track['filt'].sensor_model['R']) + association_matrix[validation_matrix[:, ti], ti] = np.log(track['filt'].sensor_model['PD']*py/(1-track['filt'].sensor_model['PD'])) # PG assumed = 1 + likelihood_matrix[validation_matrix[:, ti], ti] = track['filt'].sensor_model['PD']*py + return association_matrix, likelihood_matrix + + def get_validation_matrix(self, meas, tracks): + """Calculates the validation matrix + + ny -- number of measurements + Nc -- number of tracks + + Parameters + ---------- + meas : numpy.ndarray (: by ny) + Contains measurement(s) to validate against the tracks + tracks : list + A list containing the tracks to validate (Nc many) + + Returns + ---------- + numpy.ndarray (ny by Nc) + A logical array describing what measurements are validated to what tracks. + + """ + ny = meas.shape[1] + Nc = len(tracks) # Number of tracks to associate + validation_matrix = np.zeros((ny, Nc), dtype=bool) + for ti, track in enumerate(tracks): # Iterate over confirmed tracks + validation_matrix[:, ti] = self.gater.gate(track['x'][-1], track['P'][-1], meas) + return validation_matrix + + def _update_track(self, meas, track): + """Handles the update of a certain track with the given measurement(s). + + Modifies the track in-place! + + ny -- number of measurements + + Parameters + ---------- + meas : numpy.ndarray (: by ny) + Contains measurement(s) to update a specific track with. + track : dict + A dict containing everything relevant to the track. + + """ + if meas.size == 0: + track = self.logic(np.array([]), track['filt'], track, self.logic_params) # If no meas associated, still update logic of track + return + # Calculate prediction error of each measurement + yhat = track['filt'].sensor_model['h'](track['x'][-1]) + + eps = meas-yhat + track = self.logic(meas, track['filt'], track, self.logic_params) + + # Update + track['x'][-1], track['P'][-1] = track['filt'].update(track['x'][-1], track['P'][-1], eps) + + def evaluate(self, Y): + """ Evaluates the detections in Y. + + Parameters + ---------- + Y : list + List of detections at time k=0 to K where K is the length of Y. + Each entry of Y is ny by N_k where N_k is time-varying as the number + of detections vary. + + Returns + ------- + list, list + First list contains all initiated tracks, both tentative, deleted + and confirmed. The second list contains only the confirmed list, + even if they have died. Hence, the lists contain duplicates (but + point to the same object!). + + """ + rng = np.random.default_rng() + hypothesis = dict() # This will contain hypothesis over time + init_hypothesis = lambda probability: dict(tracks=[], probability=probability) # For more readable code in the end + hypothesis[-1] = [init_hypothesis(1)] + + ids = 0 + for k, meas_k in tqdm.tqdm(enumerate(Y), desc="HOMHT evaluating detections: ", total=len(Y)): + hypothesis[k] = [] + # For each hypothesis from the last time step + for hyp in hypothesis[k-1]: + # Propagate each track to this time step + for track in hyp['tracks']: + if track['stage'] != 'deleted': + x, P = track['filt'].propagate(track['x'][-1], track['P'][-1]) + track['x'].append(x) + track['P'].append(P) + track['t'].append(k) + + unused_meas = np.ones((meas_k.shape[1],), dtype=bool) + # For all "live" tracks in hypothesis + live_tracks = [track for track in hyp['tracks'] \ + if track['stage'] in ['confirmed', 'tentative']] + validation_matrix = self.get_validation_matrix(meas_k[:, unused_meas], + live_tracks) + association_matrix, likelihood_matrix = \ + self.get_association_cost(meas_k, live_tracks, validation_matrix) + ny = meas_k.shape[1] + nt = len(live_tracks) + # The Murty alg. returns the cost of the association and the indices of the association + for (cost, associations) in murty(-association_matrix): + unused_meas = np.ones((ny,), dtype=bool) + + # Initiate a new hypothesis and copy over all the tracks in the current hypothesis. + hypothesis[k].append(init_hypothesis(hyp['probability'])) + for track in hyp['tracks']: + if track['stage'] != 'deleted': + tmp_track = self.init_track(np.ones((2, 1)), + k, + track['identity'], + track['filt']) + tmp_track['x'][-1] = track['x'][-1] + tmp_track['P'][-1] = track['P'][-1] + tmp_track['t'] = track['t'] + tmp_track['associations'] = copy.copy(track['associations']) + tmp_track['stage'] = track['stage'] + tmp_track['Lt'] = track['Lt'] + hypothesis[k][-1]['tracks'].append(tmp_track) + + # Update the tracks with associated measurements + for j, association in enumerate(associations): + if association < nt: # i.e. associated to a track + self._update_track(meas_k[:, j], hypothesis[k][-1]['tracks'][association]) + hypothesis[k][-1]['tracks'][association]['associations'].append(k) + unused_meas[j] = 0 + # Update the proability of the hypothesis for this particular association + hypothesis[k][-1]['probability'] *= likelihood_matrix[j, association] + # Update tracks without an association in this particular hypothesis + for j in range(nt): + if j not in associations: + self._update_track(np.array([]), hypothesis[k][-1]['tracks'][j]) + + # For any still unused measurements, possibly initiate a new track + while unused_meas.any(): + # Select an unused measurement at random and initiate a track + ind = rng.choice(np.arange(unused_meas.size), p=unused_meas/unused_meas.sum()) + track = self.init_track(meas_k[:, ind], k, ids, self.filt) # Initialize track + hypothesis[k][-1]['tracks'].append(track) + unused_meas[ind] = 0 # Remove measurement from association hypothesis + validation_matrix = self.get_validation_matrix(meas_k[:, unused_meas], + [hypothesis[k][-1]['tracks'][-1]]) + ids += 1 + # Remove any gated measurements from further consideration + if validation_matrix.any(): + used_inds = (meas_k[:, unused_meas][:, validation_matrix.flatten()]==meas_k).all(axis=0) + unused_meas[used_inds] = 0 + + # Normalize the hypothesis probabilities + total_score = np.sum([hyp['probability'] for hyp in hypothesis[k]]) + for hyp in hypothesis[k]: + hyp['probability'] /= total_score + # Only keep the hypothesis which amount to pthresh probability "mass" + hypothesis[k].sort(key=lambda x: x['probability'], reverse=True) + prob = [hyp['probability'] for hyp in hypothesis[k]] + ind = np.argmax(np.cumsum(prob)>self.pthresh)+1 # Find the index to keep + hypothesis[k] = hypothesis[k][:ind] + # Only allow a certain number of hypothesis + if len(hypothesis[k]) > self.Nh: + hypothesis[k] = hypothesis[k][:self.Nh] + # Re-normalize the hypothesis probabilities + total_score = np.sum([hyp['probability'] for hyp in hypothesis[k]]) + for hyp in hypothesis[k]: + hyp['probability'] /= total_score + return hypothesis + def get_likelihood_matrix(meas, tracks, logic_params, gater): """ Computes the likelihood and validation matrix (specifically for the JPDA implementation)