Skip to content
Snippets Groups Projects
Commit 70b8d0e6 authored by Anton Kullberg's avatar Anton Kullberg
Browse files

ipynb: started working on exercise session 2

parent 3a195b6e
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id:7b4f552e-2590-44c2-ad81-13866058fb1e tags:
## <center> Target Tracking -- Exercise 2 </center>
### <center> Anton Kullberg </center>
This report contains solutions for the second set of exercises in the Target Tracking course given at Linköping University during fall 2021.
%% Cell type:code id:9e44486f tags:
``` python
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
rng = np.random.default_rng(13)
import jax
import jax.numpy as jnp
import tqdm
import scipy.stats as stats
import scipy
from src.sim import generate_data
from src.trajectories import get_ex2_trajectories
trajectories = get_ex2_trajectories()
```
%% Cell type:code id:e5ead4d2-2d44-4ef4-8961-ab8af73f2ba0 tags:
``` python
def generate_data(trajectories, sensor_model, clutter_model, rng=None):
"""Simulates measurements along a state trajectory according to a sensor and clutter model.
The function assumes Gaussian white noise affecting the measurements.
Parameters
----------
trajectories : dict of numpy.ndarrays
A dict with entries with nx by N arrays where nx is the state dimension and N is the number of time steps. Sample time T=1 is assumed.
sensor_model : dict
A dictionary with the following entries:
h : callable
measurement function
R : numpy.ndarray
measurement covariance
PD : float
probability of detection
clutter_model : dict
A dictionary with the following entries:
volume : dict
A dictionary with xmin, xmax, ymin, ymax (completely rectangular tracking volume)
lam : float
Clutter rate of Poisson distributed clutter. (lambda to coincide with numpy/wiki notation)
rng : Generator
A numpy random number generator. Can be constructed by e.g. np.random.default_rng()
Returns
-------
list of numpy.ndarray
Each list item is a numpy.ndarray with zero or more measurements (ny by x)
"""
N = max([T.shape[1] for key, T in trajectories.items()]) # Maximum length of a trajectory interesting for this purpose
if rng is None:
rng = np.random.default_rng()
measurements = []
ny = sensor_model['h'](trajectories[next(iter(trajectories))][:, 0]).size # Get the dimensionality of the measurements
for n in range(N):
# Determine amount of clutter this time
nclutter = rng.poisson(lam=clutter_model['lam'])
trajs = [T for key, T in trajectories.items() if n<T.shape[1]] # Figure out what trajectories are active right now
Ntrajs = len(trajs) # Calc. number of trajectories present in the current time step
# Initialize an array w/ the number of measurements this time step
cur_measurements = np.empty((ny, nclutter+Ntrajs))
cur_measurements[:, :] = np.NaN
if nclutter != 0:
# Calc. clutter states
clutter_states = np.concatenate([
rng.uniform(low=clutter_model['volume']['xmin'],
high=clutter_model['volume']['xmax'], size=(nclutter,)),
rng.uniform(low=clutter_model['volume']['ymin'],
high=clutter_model['volume']['ymax'], size=(nclutter,))
]).reshape(-1, nclutter)
cur_measurements[:, :nclutter] = (sensor_model['h'](clutter_states)+\
rng.multivariate_normal(mean=np.zeros((ny,)), cov=sensor_model['R'], size=(nclutter)).squeeze().T).reshape(-1, nclutter)
# Generate measurement of target(s) (possibly)
for nt, traj in enumerate(trajs):
if rng.uniform() <= sensor_model['PD']:
y = sensor_model['h'](traj[:, n])+\
rng.multivariate_normal(mean=np.zeros((ny,)), cov=sensor_model['R'])
cur_measurements[:, nclutter+nt] = y.flatten() # Add actual observation to array
cur_measurements = cur_measurements[~np.isnan(cur_measurements)].reshape(ny, -1) # Remove nan measurements (i.e. targets that did not generate a measurement)
measurements.append(cur_measurements)
return measurements
```
%% Cell type:code id:ad8aeebb tags:
``` python
import src.gaters as gaters
import src.filters as filters
import src.associators as associators
import src.trackers as trackers
import src.plotters as plotters
import src.models as models
import src.logic as logic
#R = np.diag([10, 0.001])**2
R = np.diag([0, 0]) # No noise
PD = 0.99 # Perfect detections
lam = 0 # No clutter
volume = dict(xmin=0, xmax=2500, ymin=0, ymax=2000)
V = (volume['xmax']-volume['xmin'])*(volume['ymax']-volume['ymin'])
Bfa = lam/V
Bfa = 4e-7
sensor_model = models.radar_model(R=R, PD=PD)
clutter_model = dict(volume=volume, lam=lam, Bfa=Bfa)
Bnt = Bfa
q = 1
motion_model = models.cv_model(Q=q*np.identity(2), D=2, T=1)
# Setup Gater
gamma = 4.7
gater = gaters.MahalanobisGater(sensor_model, gamma)
P0 = np.diag([10, 10, 1000, 1000])
Ptm = 0.01
Pfc = 0.001
lam = 0.9
logic_params = dict(PD=sensor_model['PD'], PG=1, lam=lam, Ptm=Ptm, Pfc=Pfc, Bfa=0.05, Ldel=np.log(Ptm/(1-Pfc)))
def init_track(y, k, identity, filt):
track = dict(stage='tentative', score=0, x=[], P=[], t=[k], identity=identity, associations=[k], filt=filt)
x0 = np.concatenate(plotters.radar_to_pos(y)).flatten()
x0 = np.hstack([x0, np.zeros((2,))])
track['x'] = [x0]
track['P'] = [P0]
return track
filt = filters.EKF(motion_model, sensor_model)
```
%% Cell type:code id:746fda2d-1e68-4715-9deb-b2afdd324537 tags:
``` python
T = []
trajs = ['T1', 'T3', 'T5', 'T6']
filtered_trajs = {key: T for key, T in trajectories.items() if key in trajs}
```
%% Cell type:code id:c6ab09ff-ae92-4df2-bda4-2f81535ef319 tags:
``` python
import pdb
confirmed_tracks = [] # Store the confirmed tracks (for plotting purposes only)
tracks = [] # Store all tracks
Y = generate_data(filtered_trajs, sensor_model, clutter_model, rng)
confirmed_tracks.append(init_track(Y[0][:,[0]], 0, 0, filt))
confirmed_tracks[-1]['stage']='confirmed'
ids = 0
for k, meas_k in tqdm.tqdm(enumerate(Y), desc="Evaluating observations: "):
ny = meas_k.shape[1]
unused_meas = np.ones((1, ny), dtype=bool)
live_tracks = [track for track in confirmed_tracks if track['stage']=='confirmed']
Nc = len(live_tracks) # Number of confirmed tracks still alive
validation_matrix = np.zeros((ny, Nc), dtype=bool)
association_matrix = -np.inf*np.ones((ny, Nc+2*ny))
# Entry for false alarms
np.fill_diagonal(association_matrix[:, Nc:Nc+ny], np.log(clutter_model['Bfa']))
# Entry for new targets
np.fill_diagonal(association_matrix[:, Nc+ny:], np.log(Bnt))
for ti, track in enumerate(live_tracks): # Iterate over confirmed tracks
validation_matrix[:, ti] = gater.gate(track['x'][-1], track['P'][-1], meas_k)
# Entry for validated tracks
val_meas = meas_k[:, validation_matrix[:, ti]] # Get the validated measurements for this track
yhat = track['filt'].sensor_model['h'](track['x'][-1]) # Calculate the predicted measurement for this track
H = track['filt'].sensor_model['dhdx'](track['x'][-1])
py = stats.multivariate_normal.pdf(val_meas.squeeze().T, mean=yhat.flatten(), cov=H@track['P'][-1]@H.T+track['filt'].sensor_model['R'])
association_matrix[validation_matrix[:, ti], ti] = np.log(track['filt'].sensor_model['PD']*py/(1-track['filt'].sensor_model['PD'])) # PG assumed = 1
# Solve association problem
row_ind, col_ind = scipy.optimize.linear_sum_assignment(-association_matrix)
pdb.set_trace()
# Update confirmed tracks
update_track(meas_k, unused_meas, filt, track, associator, gater, logic, logic_params)
if tmp.sum() != unused_meas.sum(): # If we've associated something, add the time here (for plotting purposes)
track['associations'].append(k)
tentative_tracks = [track for track in tracks if track['stage'] == 'tentative']
for track in tentative_tracks:
if track['stage'] == 'deleted':
continue
tmp = unused_meas.copy()
# Update tentative tracks
update_track(meas_k, unused_meas, filt, track, associator, gater, logic, logic_params)
if tmp.sum() != unused_meas.sum(): # If we've associated something, add the time here (for plotting purposes)
track['associations'].append(k)
if track['stage'] == 'confirmed':
confirmed_tracks.append(track) # If a track has been confirmed, add it to confirmed tracks
# Used the unused measurements to initiate new tracks
for meas in meas_k[unused_meas]:
tracks.append(init_track(meas, k, ids))
ids += 1
for track in tracks:
if track['stage'] != 'deleted':
x, P = filt.propagate(track['x'][-1], track['P'][-1])
track['x'].append(x)
track['P'].append(P)
track['t'].append(k+1)
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment