diff --git a/exercise-session-2.ipynb b/exercise-session-2.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..ba86b6c54e4b68e36af7f97d104f9c9581a9cb3b --- /dev/null +++ b/exercise-session-2.ipynb @@ -0,0 +1,278 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "7b4f552e-2590-44c2-ad81-13866058fb1e", + "metadata": {}, + "source": [ + "## <center> Target Tracking -- Exercise 2 </center>\n", + "### <center> Anton Kullberg </center>\n", + "\n", + "This report contains solutions for the second set of exercises in the Target Tracking course given at Linköping University during fall 2021." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9e44486f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "import seaborn as sns\n", + "sns.set()\n", + "rng = np.random.default_rng(13)\n", + "import jax\n", + "import jax.numpy as jnp\n", + "import tqdm\n", + "import scipy.stats as stats\n", + "import scipy\n", + "from src.sim import generate_data\n", + "from src.trajectories import get_ex2_trajectories\n", + "trajectories = get_ex2_trajectories()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5ead4d2-2d44-4ef4-8961-ab8af73f2ba0", + "metadata": {}, + "outputs": [], + "source": [ + "def generate_data(trajectories, sensor_model, clutter_model, rng=None):\n", + " \"\"\"Simulates measurements along a state trajectory according to a sensor and clutter model.\n", + "\n", + " The function assumes Gaussian white noise affecting the measurements.\n", + "\n", + " Parameters\n", + " ----------\n", + " trajectories : dict of numpy.ndarrays\n", + " A dict with entries with nx by N arrays where nx is the state dimension and N is the number of time steps. Sample time T=1 is assumed.\n", + " sensor_model : dict\n", + " A dictionary with the following entries:\n", + " h : callable\n", + " measurement function\n", + " R : numpy.ndarray\n", + " measurement covariance\n", + " PD : float\n", + " probability of detection\n", + " clutter_model : dict\n", + " A dictionary with the following entries:\n", + " volume : dict\n", + " A dictionary with xmin, xmax, ymin, ymax (completely rectangular tracking volume)\n", + " lam : float\n", + " Clutter rate of Poisson distributed clutter. (lambda to coincide with numpy/wiki notation)\n", + " rng : Generator\n", + " A numpy random number generator. Can be constructed by e.g. np.random.default_rng()\n", + "\n", + " Returns\n", + " -------\n", + " list of numpy.ndarray\n", + " Each list item is a numpy.ndarray with zero or more measurements (ny by x)\n", + "\n", + " \"\"\"\n", + " N = max([T.shape[1] for key, T in trajectories.items()]) # Maximum length of a trajectory interesting for this purpose\n", + " if rng is None:\n", + " rng = np.random.default_rng()\n", + " measurements = []\n", + " \n", + " ny = sensor_model['h'](trajectories[next(iter(trajectories))][:, 0]).size # Get the dimensionality of the measurements\n", + " \n", + " for n in range(N):\n", + " # Determine amount of clutter this time\n", + " nclutter = rng.poisson(lam=clutter_model['lam'])\n", + " \n", + " trajs = [T for key, T in trajectories.items() if n<T.shape[1]] # Figure out what trajectories are active right now \n", + " Ntrajs = len(trajs) # Calc. number of trajectories present in the current time step\n", + " # Initialize an array w/ the number of measurements this time step\n", + " cur_measurements = np.empty((ny, nclutter+Ntrajs))\n", + " cur_measurements[:, :] = np.NaN\n", + " \n", + " if nclutter != 0:\n", + " # Calc. clutter states\n", + " clutter_states = np.concatenate([\n", + " rng.uniform(low=clutter_model['volume']['xmin'],\n", + " high=clutter_model['volume']['xmax'], size=(nclutter,)),\n", + " rng.uniform(low=clutter_model['volume']['ymin'],\n", + " high=clutter_model['volume']['ymax'], size=(nclutter,))\n", + " ]).reshape(-1, nclutter)\n", + "\n", + " cur_measurements[:, :nclutter] = (sensor_model['h'](clutter_states)+\\\n", + " rng.multivariate_normal(mean=np.zeros((ny,)), cov=sensor_model['R'], size=(nclutter)).squeeze().T).reshape(-1, nclutter)\n", + "\n", + " # Generate measurement of target(s) (possibly)\n", + " for nt, traj in enumerate(trajs):\n", + " if rng.uniform() <= sensor_model['PD']:\n", + " y = sensor_model['h'](traj[:, n])+\\\n", + " rng.multivariate_normal(mean=np.zeros((ny,)), cov=sensor_model['R'])\n", + " cur_measurements[:, nclutter+nt] = y.flatten() # Add actual observation to array\n", + " \n", + " cur_measurements = cur_measurements[~np.isnan(cur_measurements)].reshape(ny, -1) # Remove nan measurements (i.e. targets that did not generate a measurement)\n", + " measurements.append(cur_measurements)\n", + " return measurements\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ad8aeebb", + "metadata": {}, + "outputs": [], + "source": [ + "import src.gaters as gaters\n", + "import src.filters as filters\n", + "import src.associators as associators\n", + "import src.trackers as trackers\n", + "import src.plotters as plotters\n", + "import src.models as models\n", + "import src.logic as logic\n", + "\n", + "#R = np.diag([10, 0.001])**2\n", + "R = np.diag([0, 0]) # No noise\n", + "PD = 0.99 # Perfect detections\n", + "lam = 0 # No clutter\n", + "volume = dict(xmin=0, xmax=2500, ymin=0, ymax=2000)\n", + "V = (volume['xmax']-volume['xmin'])*(volume['ymax']-volume['ymin'])\n", + "Bfa = lam/V\n", + "Bfa = 4e-7\n", + "sensor_model = models.radar_model(R=R, PD=PD)\n", + "clutter_model = dict(volume=volume, lam=lam, Bfa=Bfa)\n", + "Bnt = Bfa\n", + "\n", + "q = 1\n", + "motion_model = models.cv_model(Q=q*np.identity(2), D=2, T=1)\n", + "\n", + "# Setup Gater\n", + "gamma = 4.7\n", + "gater = gaters.MahalanobisGater(sensor_model, gamma)\n", + "\n", + "P0 = np.diag([10, 10, 1000, 1000])\n", + "Ptm = 0.01\n", + "Pfc = 0.001\n", + "lam = 0.9\n", + "logic_params = dict(PD=sensor_model['PD'], PG=1, lam=lam, Ptm=Ptm, Pfc=Pfc, Bfa=0.05, Ldel=np.log(Ptm/(1-Pfc)))\n", + "\n", + "def init_track(y, k, identity, filt):\n", + " track = dict(stage='tentative', score=0, x=[], P=[], t=[k], identity=identity, associations=[k], filt=filt)\n", + " x0 = np.concatenate(plotters.radar_to_pos(y)).flatten()\n", + " x0 = np.hstack([x0, np.zeros((2,))])\n", + " track['x'] = [x0]\n", + " track['P'] = [P0]\n", + " return track\n", + "\n", + "filt = filters.EKF(motion_model, sensor_model)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "746fda2d-1e68-4715-9deb-b2afdd324537", + "metadata": {}, + "outputs": [], + "source": [ + "T = []\n", + "trajs = ['T1', 'T3', 'T5', 'T6']\n", + "filtered_trajs = {key: T for key, T in trajectories.items() if key in trajs}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6ab09ff-ae92-4df2-bda4-2f81535ef319", + "metadata": {}, + "outputs": [], + "source": [ + "import pdb\n", + "confirmed_tracks = [] # Store the confirmed tracks (for plotting purposes only)\n", + "tracks = [] # Store all tracks\n", + "Y = generate_data(filtered_trajs, sensor_model, clutter_model, rng)\n", + "confirmed_tracks.append(init_track(Y[0][:,[0]], 0, 0, filt))\n", + "confirmed_tracks[-1]['stage']='confirmed'\n", + "ids = 0\n", + "\n", + "for k, meas_k in tqdm.tqdm(enumerate(Y), desc=\"Evaluating observations: \"):\n", + " ny = meas_k.shape[1]\n", + " unused_meas = np.ones((1, ny), dtype=bool)\n", + " \n", + " \n", + " live_tracks = [track for track in confirmed_tracks if track['stage']=='confirmed']\n", + " Nc = len(live_tracks) # Number of confirmed tracks still alive\n", + " validation_matrix = np.zeros((ny, Nc), dtype=bool)\n", + " \n", + " association_matrix = -np.inf*np.ones((ny, Nc+2*ny))\n", + " # Entry for false alarms\n", + " np.fill_diagonal(association_matrix[:, Nc:Nc+ny], np.log(clutter_model['Bfa']))\n", + " # Entry for new targets\n", + " np.fill_diagonal(association_matrix[:, Nc+ny:], np.log(Bnt))\n", + " \n", + " for ti, track in enumerate(live_tracks): # Iterate over confirmed tracks\n", + " validation_matrix[:, ti] = gater.gate(track['x'][-1], track['P'][-1], meas_k)\n", + " # Entry for validated tracks\n", + " val_meas = meas_k[:, validation_matrix[:, ti]] # Get the validated measurements for this track\n", + " yhat = track['filt'].sensor_model['h'](track['x'][-1]) # Calculate the predicted measurement for this track\n", + " H = track['filt'].sensor_model['dhdx'](track['x'][-1])\n", + " py = stats.multivariate_normal.pdf(val_meas.squeeze().T, mean=yhat.flatten(), cov=H@track['P'][-1]@H.T+track['filt'].sensor_model['R'])\n", + " association_matrix[validation_matrix[:, ti], ti] = np.log(track['filt'].sensor_model['PD']*py/(1-track['filt'].sensor_model['PD'])) # PG assumed = 1\n", + " # Solve association problem\n", + " row_ind, col_ind = scipy.optimize.linear_sum_assignment(-association_matrix)\n", + " \n", + " pdb.set_trace()\n", + " \n", + " \n", + " # Update confirmed tracks\n", + " update_track(meas_k, unused_meas, filt, track, associator, gater, logic, logic_params)\n", + " if tmp.sum() != unused_meas.sum(): # If we've associated something, add the time here (for plotting purposes)\n", + " track['associations'].append(k)\n", + "\n", + " tentative_tracks = [track for track in tracks if track['stage'] == 'tentative']\n", + " for track in tentative_tracks:\n", + " if track['stage'] == 'deleted':\n", + " continue\n", + " tmp = unused_meas.copy()\n", + " # Update tentative tracks\n", + " update_track(meas_k, unused_meas, filt, track, associator, gater, logic, logic_params)\n", + " if tmp.sum() != unused_meas.sum(): # If we've associated something, add the time here (for plotting purposes)\n", + " track['associations'].append(k)\n", + " if track['stage'] == 'confirmed':\n", + " confirmed_tracks.append(track) # If a track has been confirmed, add it to confirmed tracks\n", + "\n", + " # Used the unused measurements to initiate new tracks\n", + " for meas in meas_k[unused_meas]:\n", + " tracks.append(init_track(meas, k, ids))\n", + " ids += 1\n", + "\n", + " for track in tracks:\n", + " if track['stage'] != 'deleted':\n", + " x, P = filt.propagate(track['x'][-1], track['P'][-1])\n", + " track['x'].append(x)\n", + " track['P'].append(P)\n", + " track['t'].append(k+1)\n", + "\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}