Skip to content
Snippets Groups Projects
Commit 8149955b authored by Anton Kullberg's avatar Anton Kullberg
Browse files

ipynb: working on JPDA. Enumeration of hypothesis finished.

parent 9e03540c
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id:7b4f552e-2590-44c2-ad81-13866058fb1e tags:
## <center> Target Tracking -- Exercise 2 </center>
### <center> Anton Kullberg </center>
This report contains solutions for the second set of exercises in the Target Tracking course given at Linköping University during fall 2021.
%% Cell type:code id:9e44486f tags:
``` python
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
rng = np.random.default_rng(13)
import jax
import jax.numpy as jnp
import tqdm
import scipy.stats as stats
import scipy
from src.sim import generate_data
from src.trajectories import get_ex2_trajectories
trajectories = get_ex2_trajectories()
```
%% Cell type:code id:e5ead4d2-2d44-4ef4-8961-ab8af73f2ba0 tags:
``` python
def generate_data(trajectories, sensor_model, clutter_model, rng=None):
"""Simulates measurements along a state trajectory according to a sensor and clutter model.
The function assumes Gaussian white noise affecting the measurements.
Parameters
----------
trajectories : dict of numpy.ndarrays
A dict with entries with nx by N arrays where nx is the state dimension and N is the number of time steps. Sample time T=1 is assumed.
sensor_model : dict
A dictionary with the following entries:
h : callable
measurement function
R : numpy.ndarray
measurement covariance
PD : float
probability of detection
clutter_model : dict
A dictionary with the following entries:
volume : dict
A dictionary with xmin, xmax, ymin, ymax (completely rectangular tracking volume)
lam : float
Clutter rate of Poisson distributed clutter. (lambda to coincide with numpy/wiki notation)
rng : Generator
A numpy random number generator. Can be constructed by e.g. np.random.default_rng()
Returns
-------
list of numpy.ndarray
Each list item is a numpy.ndarray with zero or more measurements (ny by x)
"""
N = max([T.shape[1] for key, T in trajectories.items()]) # Maximum length of a trajectory interesting for this purpose
if rng is None:
rng = np.random.default_rng()
measurements = []
ny = sensor_model['h'](trajectories[next(iter(trajectories))][:, 0]).size # Get the dimensionality of the measurements
for n in range(N):
# Determine amount of clutter this time
nclutter = rng.poisson(lam=clutter_model['lam'])
trajs = [T for key, T in trajectories.items() if n<T.shape[1]] # Figure out what trajectories are active right now
Ntrajs = len(trajs) # Calc. number of trajectories present in the current time step
# Initialize an array w/ the number of measurements this time step
cur_measurements = np.empty((ny, nclutter+Ntrajs))
cur_measurements[:, :] = np.NaN
if nclutter != 0:
# Calc. clutter states
clutter_states = np.concatenate([
rng.uniform(low=clutter_model['volume']['xmin'],
high=clutter_model['volume']['xmax'], size=(nclutter,)),
rng.uniform(low=clutter_model['volume']['ymin'],
high=clutter_model['volume']['ymax'], size=(nclutter,))
]).reshape(-1, nclutter)
cur_measurements[:, :nclutter] = (sensor_model['h'](clutter_states)+\
rng.multivariate_normal(mean=np.zeros((ny,)), cov=sensor_model['R'], size=(nclutter)).squeeze().T).reshape(-1, nclutter)
# Generate measurement of target(s) (possibly)
for nt, traj in enumerate(trajs):
if rng.uniform() <= sensor_model['PD']:
y = sensor_model['h'](traj[:, n])+\
rng.multivariate_normal(mean=np.zeros((ny,)), cov=sensor_model['R'])
cur_measurements[:, nclutter+nt] = y.flatten() # Add actual observation to array
cur_measurements = cur_measurements[~np.isnan(cur_measurements)].reshape(ny, -1) # Remove nan measurements (i.e. targets that did not generate a measurement)
measurements.append(cur_measurements)
return measurements
```
%% Cell type:code id:746fda2d-1e68-4715-9deb-b2afdd324537 tags:
``` python
T = []
trajs = ['T1', 'T3', 'T5', 'T6']
filtered_trajs = {key: T for key, T in trajectories.items() if key in trajs}
```
%% Cell type:code id:ad8aeebb tags:
``` python
import src.gaters as gaters
import src.filters as filters
import src.associators as associators
import src.trackers as trackers
import src.plotters as plotters
import src.models as models
import src.logic as logic
#R = np.diag([10, 0.001])**2
R = np.diag([0, 0]) # No noise
PD = 0.99 # Perfect detections
lam = 0 # No clutter
R = np.diag([10, 0.001])**2
# R = np.diag([0, 0]) # No noise
PD = 0.9 # Perfect detections
lam = 2 # No clutter
volume = dict(xmin=0, xmax=2500, ymin=0, ymax=2000)
V = (volume['xmax']-volume['xmin'])*(volume['ymax']-volume['ymin'])
Bfa = lam/V
Bfa = 4e-7
# Bfa = 4e-7
sensor_model = models.radar_model(R=R, PD=PD)
clutter_model = dict(volume=volume, lam=lam, Bfa=Bfa)
Bnt = Bfa
q = 1
q = 10
motion_model = models.cv_model(Q=q*np.identity(2), D=2, T=1)
# Setup Gater
gamma = 4.7
gater = gaters.MahalanobisGater(sensor_model, gamma)
P0 = np.diag([10, 10, 1000, 1000])
Ptm = 0.01
Pfc = 0.001
lam = 0.9
logic_params = dict(PD=sensor_model['PD'], PG=1, lam=lam, Ptm=Ptm, Pfc=Pfc, Bfa=0.05, Ldel=np.log(Ptm/(1-Pfc)))
lam = 0.7
logic_params = dict(PD=sensor_model['PD'], PG=1, lam=lam, Ptm=Ptm, Pfc=Pfc, Bfa=Bfa, Ldel=0.1*np.log(Ptm/(1-Pfc)), Bnt=Bnt)
def init_track(y, k, identity, filt):
track = dict(stage='tentative', score=0, x=[], P=[], t=[k], identity=identity, associations=[k], filt=filt)
x0 = np.concatenate(plotters.radar_to_pos(y)).flatten()
track = dict(stage='tentative', Lt=0, x=[], P=[], t=[k], identity=identity, associations=[k], filt=filt)
x0 = np.concatenate(plotters.radar_to_pos(y[:, None])).flatten()
x0 = np.hstack([x0, np.zeros((2,))])
track['x'] = [x0]
track['P'] = [P0]
return track
filt = filters.EKF(motion_model, sensor_model)
```
%% Cell type:code id:746fda2d-1e68-4715-9deb-b2afdd324537 tags:
%% Cell type:code id:1ff81504-d524-4106-ba87-4d0242930ea0 tags:
``` python
T = []
trajs = ['T1', 'T3', 'T5', 'T6']
filtered_trajs = {key: T for key, T in trajectories.items() if key in trajs}
import pdb
class JPDA():
def __init__(self, logic, logic_params, init_track, filt, gater, clutter_model):
self.logic = logic
self.logic_params = logic_params
self.init_track = init_track
self.filt = filt
self.gater = gater
self.clutter_model = clutter_model
def _update_track(self, meas, track):
if meas.size == 0:
track = self.logic(np.array([]), track['filt'], track, self.logic_params) # If no meas associated, still update logic of track
return
# Calculate prediction error of each measurement
yhat = track['filt'].sensor_model['h'](track['x'][-1])
eps = meas-yhat
track = self.logic(meas, track['filt'], track, self.logic_params)
# Update
track['x'][-1], track['P'][-1] = track['filt'].update(track['x'][-1], track['P'][-1], eps)
def evaluate(self, Y):
tracks = [] # Store all tracks
confirmed_tracks = [] # Store the confirmed tracks (for plotting purposes only)
ids = 0
for k, meas_k in tqdm.tqdm(enumerate(Y), desc="Evaluating observations: "):
ny = meas_k.shape[1]
unused_meas = np.ones((ny), dtype=bool)
live_tracks = [track for track in confirmed_tracks if track['stage']=='confirmed']
# if live_tracks:
# association_matrix, _ = get_association_matrix(meas_k, live_tracks, self.logic_params, self.gater)
# # Solve association problem
# row_ind, col_ind = scipy.optimize.linear_sum_assignment(-association_matrix)
# for row, col in zip(row_ind, col_ind):
# if col >= len(live_tracks): # No target to associate the measurement to
# continue
# else:
# unused_meas[row] = 0 # Remove this measurement from further consideration
# # Update confirmed tracks
# self._update_track(meas_k[:, row], live_tracks[col])
# live_tracks[col]['associations'].append(k) # If we've associated something, add the time here (for plotting purposes)
# for i in range(len(live_tracks)):
# if i not in col_ind:
# self._update_track(np.array([]), live_tracks[i])
tentative_tracks = [track for track in tracks if track['stage'] == 'tentative']
if tentative_tracks:
association_matrix, validation_matrix = trackers.get_association_matrix(meas_k[:, unused_meas], tentative_tracks, self.logic_params, self.gater)
Nc = len(tentative_tracks)
rec = compute_prob(association_matrix[:, :ny+Nc], validation_matrix, self.logic_params)
pdb.set_trace()
# Solve association problem
row_ind, col_ind = scipy.optimize.linear_sum_assignment(-association_matrix)
meas = meas_k[:, unused_meas]
for row, col in zip(row_ind, col_ind):
if col >= len(tentative_tracks): # No target to associate the measurement to
continue
else:
unused_meas[(meas_k == meas[:,[row]]).all(axis=0)] = 0 # Remove this measurement from consideration
# Update confirmed tracks
self._update_track(meas[:, row], tentative_tracks[col])
tentative_tracks[col]['associations'].append(k) # If we've associated something, add the time here (for plotting purposes)
if tentative_tracks[col]['stage'] == 'confirmed':
confirmed_tracks.append(tentative_tracks[col]) # If a track has been confirmed, add it to confirmed tracks
for i in range(len(tentative_tracks)):
if i not in col_ind:
self._update_track(np.array([]), tentative_tracks[i])
# Use the unused measurements to initiate new tracks
for meas in meas_k[:, unused_meas].T:
tracks.append(self.init_track(meas, k, ids, self.filt))
ids += 1
for track in tracks:
if track['stage'] != 'deleted':
x, P = track['filt'].propagate(track['x'][-1], track['P'][-1])
track['x'].append(x)
track['P'].append(P)
track['t'].append(k+1)
return tracks, confirmed_tracks
def compute_prob(association_matrix, validation_matrix, logic_params):
# Association matrix is assumed to consist of tracks and FA, no NT.
ny = association_matrix.shape[0]
ntracks = association_matrix.shape[1]-ny
P = np.zeros((ny, ntracks))
def rec_find_associations(association_matrix, assoc_done, logic_params):
inds = np.where(association_matrix[0, :] != -np.inf)[0] # These are the nodes necessary to look at
this_assoc = []
for k, i in enumerate(inds):
if i not in assoc_done:
if association_matrix.shape[0] != 1:
assoc = rec_compute_prob(association_matrix[1:, :], [[i]], logic_params)
this_assoc.extend(assoc)
else:
this_assoc.append([i])
result = []
for assoc in assoc_done:
for th_assoc in this_assoc:
result.append(assoc + th_assoc)
return result
possible_associations = rec_find_associations(association_matrix, [[]], logic_params) # Recursively finds possible measurement hypothesis
pdb.set_trace()
return res
```
%% Cell type:code id:31e8ca55-8bb0-4662-977a-857d8cdf4ea2 tags:
``` python
jpda = JPDA(logic.score_logic, logic_params, init_track, filt, gater, clutter_model)
Y = generate_data(filtered_trajs, sensor_model, clutter_model, rng)
tracks, confirmed_tracks = jpda.evaluate(Y)
```
%% Cell type:code id:0c29b3fb-55ac-4c19-9acb-b493e536bd8d tags:
``` python
from src.utility import match_tracks_to_ground_truth
matches = match_tracks_to_ground_truth(confirmed_tracks, filtered_trajs)
```
%% Cell type:code id:5dc32f5b-53c4-40e8-8c4b-9c50274cf2f0 tags:
``` python
fig, ax = plt.subplots(3, 1, figsize=(16, 15))
Yt = np.hstack(Y)
yx, yy = plotters.radar_to_pos(Yt)
for key, T in filtered_trajs.items():
ax[1].plot(T[0, :], T[1, :], color='k', lw=3)
for track in tracks:
x = np.vstack(track['x'])
t = np.hstack(track['t']).flatten()
assoc = np.hstack(track['associations']).flatten()
if track in confirmed_tracks:
ls = '-'
l = ax[0].plot(t, track['identity']*np.ones(t.shape), ls=ls, markersize=3)[0]
ax[0].plot(assoc, track['identity']*np.ones(assoc.shape), ls='', color=l.get_color(), marker='x', markersize=6)
ax[1].plot(x[:, 0], x[:, 1], ls=ls, color=l.get_color(), lw=2)
else:
ls = '--'
ax[1].plot(x[:, 0], x[:, 1], ls=ls, lw=2)
ax[0].set_ylabel('Track identity')
ax[0].set_title('Confirmed tracks over time')
ax[0].set_xlabel('Time index, k')
ax[1].plot(yx, yy, '.', color='k')
ax[1].set_xlabel(r'$p_x$')
ax[1].set_ylabel(r'$p_y$')
ax[1].set_title('Measurements and measurement predictions + tracks')
# Plot the RMSE for the matched trajectories
for track_id, key in matches.items():
T = filtered_trajs[key]
track = [track for track in confirmed_tracks if track['identity'] == track_id][0]
x = np.vstack(track['x']).T
tf = np.hstack(track['t']).flatten()[-1]
gtf = T.shape[1]
if T.shape[1] > x.shape[1]:
N = x.shape[1]
else:
N = T.shape[1]
xrmse = np.sum((T[:, :N]-x[:2, :N])**2, axis=0)/N
terr = np.abs(gtf-tf)
T = np.linspace(0, x.shape[1]/N, N)
ax[2].plot(T, xrmse, label='{} -- Time error: {} timesteps'.format(key, terr))
ax[2].set_xlabel('Trajectory index')
ax[2].set_ylabel('Positional RMSE')
ax[2].legend()
plt.tight_layout()
plt.show()
```
%% Output
%% Cell type:code id:c6ab09ff-ae92-4df2-bda4-2f81535ef319 tags:
``` python
import pdb
confirmed_tracks = [] # Store the confirmed tracks (for plotting purposes only)
tracks = [] # Store all tracks
Y = generate_data(filtered_trajs, sensor_model, clutter_model, rng)
confirmed_tracks.append(init_track(Y[0][:,[0]], 0, 0, filt))
confirmed_tracks[-1]['stage']='confirmed'
ids = 0
for k, meas_k in tqdm.tqdm(enumerate(Y), desc="Evaluating observations: "):
ny = meas_k.shape[1]
unused_meas = np.ones((1, ny), dtype=bool)
unused_meas = np.ones((ny), dtype=bool)
live_tracks = [track for track in confirmed_tracks if track['stage']=='confirmed']
Nc = len(live_tracks) # Number of confirmed tracks still alive
validation_matrix = np.zeros((ny, Nc), dtype=bool)
association_matrix = -np.inf*np.ones((ny, Nc+2*ny))
# Entry for false alarms
np.fill_diagonal(association_matrix[:, Nc:Nc+ny], np.log(clutter_model['Bfa']))
# Entry for new targets
np.fill_diagonal(association_matrix[:, Nc+ny:], np.log(Bnt))
for ti, track in enumerate(live_tracks): # Iterate over confirmed tracks
validation_matrix[:, ti] = gater.gate(track['x'][-1], track['P'][-1], meas_k)
# Entry for validated tracks
val_meas = meas_k[:, validation_matrix[:, ti]] # Get the validated measurements for this track
yhat = track['filt'].sensor_model['h'](track['x'][-1]) # Calculate the predicted measurement for this track
H = track['filt'].sensor_model['dhdx'](track['x'][-1])
py = stats.multivariate_normal.pdf(val_meas.squeeze().T, mean=yhat.flatten(), cov=H@track['P'][-1]@H.T+track['filt'].sensor_model['R'])
association_matrix[validation_matrix[:, ti], ti] = np.log(track['filt'].sensor_model['PD']*py/(1-track['filt'].sensor_model['PD'])) # PG assumed = 1
# Solve association problem
row_ind, col_ind = scipy.optimize.linear_sum_assignment(-association_matrix)
pdb.set_trace()
if live_tracks:
association_matrix = get_association_matrix(meas_k, live_tracks, gater)
# Solve association problem
row_ind, col_ind = scipy.optimize.linear_sum_assignment(-association_matrix)
for row, col in zip(row_ind, col_ind):
if col >= len(live_tracks): # No target to associate the measurement to
continue
else:
unused_meas[row] = 0 # Remove this measurement from further consideration
# Update confirmed tracks
update_track(meas_k[:, row], live_tracks[col], logic.score_logic, logic_params)
live_tracks[col]['associations'].append(k) # If we've associated something, add the time here (for plotting purposes)
for i in range(len(live_tracks)):
if i not in col_ind:
live_tracks[i] = logic.score_logic(np.array([]), live_tracks[i]['filt'], live_tracks[i], logic_params) # If no meas associated, still update logic of track
# Update confirmed tracks
update_track(meas_k, unused_meas, filt, track, associator, gater, logic, logic_params)
if tmp.sum() != unused_meas.sum(): # If we've associated something, add the time here (for plotting purposes)
track['associations'].append(k)
tentative_tracks = [track for track in tracks if track['stage'] == 'tentative']
for track in tentative_tracks:
if track['stage'] == 'deleted':
continue
tmp = unused_meas.copy()
# Update tentative tracks
update_track(meas_k, unused_meas, filt, track, associator, gater, logic, logic_params)
if tmp.sum() != unused_meas.sum(): # If we've associated something, add the time here (for plotting purposes)
track['associations'].append(k)
if track['stage'] == 'confirmed':
confirmed_tracks.append(track) # If a track has been confirmed, add it to confirmed tracks
# Used the unused measurements to initiate new tracks
for meas in meas_k[unused_meas]:
tracks.append(init_track(meas, k, ids))
if tentative_tracks:
association_matrix = get_association_matrix(meas_k[:, unused_meas], tentative_tracks, gater)
# Solve association problem
row_ind, col_ind = scipy.optimize.linear_sum_assignment(-association_matrix)
meas = meas_k[:, unused_meas]
for row, col in zip(row_ind, col_ind):
if col >= len(tentative_tracks): # No target to associate the measurement to
continue
else:
unused_meas[(meas_k == meas[:,[row]]).all(axis=0)] = 0 # Remove this measurement from consideration
# Update confirmed tracks
update_track(meas[:, row], tentative_tracks[col], logic.score_logic, logic_params)
tentative_tracks[col]['associations'].append(k) # If we've associated something, add the time here (for plotting purposes)
if tentative_tracks[col]['stage'] == 'confirmed':
confirmed_tracks.append(tentative_tracks[col]) # If a track has been confirmed, add it to confirmed tracks
for i in range(len(tentative_tracks)):
if i not in col_ind:
tentative_tracks[i] = logic.score_logic(np.array([]), tentative_tracks[i]['filt'], tentative_tracks[i], logic_params) # If no meas associated, still update logic of track
# Use the unused measurements to initiate new tracks
for meas in meas_k[:, unused_meas].T:
tracks.append(init_track(meas, k, ids, filt))
ids += 1
for track in tracks:
if track['stage'] != 'deleted':
x, P = filt.propagate(track['x'][-1], track['P'][-1])
x, P = track['filt'].propagate(track['x'][-1], track['P'][-1])
track['x'].append(x)
track['P'].append(P)
track['t'].append(k+1)
```
%% Output
Evaluating observations: : 102it [00:33, 3.04it/s]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment