Skip to content

Commit

Permalink
feat: cleaning the code removing dead part (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
nmathieufact authored Mar 29, 2024
1 parent c781a0c commit 4d8a20d
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 212 deletions.
12 changes: 0 additions & 12 deletions bytetracker/basetrack.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
from collections import OrderedDict

import numpy as np


class TrackState(object):
New = 0
Tracked = 1
Expand All @@ -17,16 +12,9 @@ class BaseTrack(object):
is_activated = False
state = TrackState.New

history = OrderedDict()
features = []
curr_feature = None
score = 0
start_frame = 0
frame_id = 0
time_since_update = 0

# multi-camera
location = (np.inf, np.inf)

@property
def end_frame(self):
Expand Down
23 changes: 1 addition & 22 deletions bytetracker/byte_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(self, tlwh, score, cls):
self.is_activated = False

self.score = score
self.tracklet_len = 0
self.cls = cls

def predict(self):
Expand Down Expand Up @@ -66,7 +65,6 @@ def activate(self, kalman_filter, frame_id):
self.track_id = self.next_id()
self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh))

self.tracklet_len = 0
self.state = TrackState.Tracked
if frame_id == 1:
self.is_activated = True
Expand All @@ -78,7 +76,6 @@ def re_activate(self, new_track, frame_id, new_id=False):
self.mean, self.covariance = self.kalman_filter.update(
self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
)
self.tracklet_len = 0
self.state = TrackState.Tracked
self.is_activated = True
self.frame_id = frame_id
Expand All @@ -96,7 +93,6 @@ def update(self, new_track, frame_id):
:return:
"""
self.frame_id = frame_id
self.tracklet_len += 1
self.cls = new_track.cls

new_tlwh = new_track.tlwh
Expand Down Expand Up @@ -142,23 +138,6 @@ def tlwh_to_xyah(tlwh):
ret[2] /= ret[3]
return ret

def to_xyah(self):
return self.tlwh_to_xyah(self.tlwh)

@staticmethod
# @jit(nopython=True)
def tlbr_to_tlwh(tlbr):
ret = np.asarray(tlbr).copy()
ret[2:] -= ret[:2]
return ret

@staticmethod
# @jit(nopython=True)
def tlwh_to_tlbr(tlwh):
ret = np.asarray(tlwh).copy()
ret[2:] += ret[:2]
return ret

def __repr__(self):
return "OT_{}_({}-{})".format(self.track_id, self.start_frame, self.end_frame)

Expand Down Expand Up @@ -262,7 +241,7 @@ def update(self, dets, frame_id):
strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked
]
dists = matching.iou_distance(r_tracked_stracks, detections_second)
matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5)
matches, u_track, _ = matching.linear_assignment(dists, thresh=0.5)
for itracked, idet in matches:
track = r_tracked_stracks[itracked]
det = detections_second[idet]
Expand Down
60 changes: 0 additions & 60 deletions bytetracker/kalman_filter.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,6 @@
import numpy as np
import scipy.linalg

"""
Table for the 0.95 quantile of the chi-square distribution with N degrees of
freedom (contains values for N=1, ..., 9). Taken from MATLAB/Octave's chi2inv
function and used as Mahalanobis gating threshold.
"""
chi2inv95 = {
1: 3.8415,
2: 5.9915,
3: 7.8147,
4: 9.4877,
5: 11.070,
6: 12.592,
7: 14.067,
8: 15.507,
9: 16.919,
}


class KalmanFilter(object):
"""
Expand Down Expand Up @@ -229,46 +212,3 @@ def update(self, mean, covariance, measurement):
(kalman_gain, projected_cov, kalman_gain.T)
)
return new_mean, new_covariance

def gating_distance(self, mean, covariance, measurements, only_position=False, metric="maha"):
"""Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If
`only_position` is False, the chi-square distribution has 4 degrees of
freedom, otherwise 2.
Parameters
----------
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in
format (x, y, a, h) where (x, y) is the bounding box center
position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding
box center position only.
Returns
-------
ndarray
Returns an array of length N, where the i-th element contains the
squared Mahalanobis distance between (mean, covariance) and
`measurements[i]`.
"""
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]

d = measurements - mean
if metric == "gaussian":
return np.sum(d * d, axis=1)
elif metric == "maha":
cholesky_factor = np.linalg.cholesky(covariance)
z = scipy.linalg.solve_triangular(
cholesky_factor, d.T, lower=True, check_finite=False, overwrite_b=True
)
squared_maha = np.sum(z * z, axis=0)
return squared_maha
else:
raise ValueError("invalid distance metric")
119 changes: 1 addition & 118 deletions bytetracker/matching.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,5 @@
import lap
import numpy as np
import scipy
from scipy.spatial.distance import cdist

from bytetracker import kalman_filter


def merge_matches(m1, m2, shape):
O, P, Q = shape
m1 = np.asarray(m1)
m2 = np.asarray(m2)

M1 = scipy.sparse.coo_matrix((np.ones(len(m1)), (m1[:, 0], m1[:, 1])), shape=(O, P))
M2 = scipy.sparse.coo_matrix((np.ones(len(m2)), (m2[:, 0], m2[:, 1])), shape=(P, Q))

mask = M1 * M2
match = mask.nonzero()
match = list(zip(match[0], match[1]))
unmatched_O = tuple(set(range(O)) - set([i for i, j in match]))
unmatched_Q = tuple(set(range(Q)) - set([j for i, j in match]))

return match, unmatched_O, unmatched_Q


def _indices_to_matches(cost_matrix, indices, thresh):
matched_cost = cost_matrix[tuple(zip(*indices))]
matched_mask = matched_cost <= thresh

matches = indices[matched_mask]
unmatched_a = tuple(set(range(cost_matrix.shape[0])) - set(matches[:, 0]))
unmatched_b = tuple(set(range(cost_matrix.shape[1])) - set(matches[:, 1]))

return matches, unmatched_a, unmatched_b


def linear_assignment(cost_matrix, thresh):
Expand All @@ -42,7 +10,7 @@ def linear_assignment(cost_matrix, thresh):
tuple(range(cost_matrix.shape[1])),
)
matches, unmatched_a, unmatched_b = [], [], []
cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
_, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
for ix, mx in enumerate(x):
if mx >= 0:
matches.append([ix, mx])
Expand Down Expand Up @@ -95,91 +63,6 @@ def iou_distance(atracks, btracks):
return cost_matrix


def v_iou_distance(atracks, btracks):
"""
Compute cost based on IoU
:type atracks: list[STrack]
:type btracks: list[STrack]
:rtype cost_matrix np.ndarray
"""

if (len(atracks) > 0 and isinstance(atracks[0], np.ndarray)) or (
len(btracks) > 0 and isinstance(btracks[0], np.ndarray)
):
atlbrs = atracks
btlbrs = btracks
else:
atlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in atracks]
btlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in btracks]
_ious = ious(atlbrs, btlbrs)
cost_matrix = 1 - _ious

return cost_matrix


def embedding_distance(tracks, detections, metric="cosine"):
"""
:param tracks: list[STrack]
:param detections: list[BaseTrack]
:param metric:
:return: cost_matrix np.ndarray
"""

cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float32)
if cost_matrix.size == 0:
return cost_matrix
det_features = np.asarray([track.curr_feat for track in detections], dtype=np.float32)
# for i, track in enumerate(tracks):
# cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float32)
cost_matrix = np.maximum(0.0, cdist(track_features, det_features, metric)) # Nomalized features
return cost_matrix


def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False):
if cost_matrix.size == 0:
return cost_matrix
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray([det.to_xyah() for det in detections])
for row, track in enumerate(tracks):
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position
)
cost_matrix[row, gating_distance > gating_threshold] = np.inf
return cost_matrix


def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98):
if cost_matrix.size == 0:
return cost_matrix
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray([det.to_xyah() for det in detections])
for row, track in enumerate(tracks):
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position, metric="maha"
)
cost_matrix[row, gating_distance > gating_threshold] = np.inf
cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_) * gating_distance
return cost_matrix


def fuse_iou(cost_matrix, tracks, detections):
if cost_matrix.size == 0:
return cost_matrix
reid_sim = 1 - cost_matrix
iou_dist = iou_distance(tracks, detections)
iou_sim = 1 - iou_dist
fuse_sim = reid_sim * (1 + iou_sim) / 2
det_scores = np.array([det.score for det in detections])
det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
# fuse_sim = fuse_sim * (1 + det_scores) / 2
fuse_cost = 1 - fuse_sim
return fuse_cost


def fuse_score(cost_matrix, detections):
if cost_matrix.size == 0:
return cost_matrix
Expand Down

0 comments on commit 4d8a20d

Please sign in to comment.