Skip to content

Commit

Permalink
feat: Compute speed and acceleration
Browse files Browse the repository at this point in the history
This commit adds support for computing the speed and acceleration of the ball and the players from tracking data.
  • Loading branch information
probberechts committed Apr 4, 2024
1 parent c00a461 commit 2983c65
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 76 deletions.
269 changes: 258 additions & 11 deletions kloppy/domain/models/tracking.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,83 @@
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Union, Any
from typing import Any, Callable, Dict, List, Optional, Union

import numpy as np
from scipy.signal import savgol_filter

from kloppy.domain.models.common import DatasetType
from kloppy.exceptions import KloppyError
from kloppy.utils import deprecated

from .common import Dataset, DataRecord, Player
from .pitch import Point, Point3D
from kloppy.utils import (
deprecated,
)
from .common import DataRecord, Dataset, Player
from .pitch import Point


@dataclass
class PlayerData:
class Detection:
"""A single detection of a trackable object in a frame.
Attributes:
coordinates: The coordinates of the object in the frame.
distance: The distance the object has traveled since the previous frame.
speed: The speed of the object in the frame.
acceleration: The acceleration of the object in the frame.
other_data: Additional data about the object in the frame.
"""

coordinates: Point
distance: Optional[float] = None
speed: Optional[float] = None
acceleration: Optional[float] = None
other_data: Dict[str, Any] = field(default_factory=dict)


@dataclass
class Trajectory:
"""Detections of a trackable object over a sequence of consecutive frames.
Attributes:
trackable_object: The object being tracked. Either a player or "ball".
start_frame: The frame number of the first detection in the trajectory.
end_frame: The frame number of the last detection in the trajectory.
detections: A list of Detection objects, one for each frame in the trajectory.
"""

trackable_object: Union[Player, str]
start_frame: int
end_frame: int
detections: List[Detection]

def __iter__(self):
return iter(self.detections)

def __len__(self):
return len(self.detections)


@dataclass(repr=False)
class Frame(DataRecord):
frame_id: int
players_data: Dict[Player, PlayerData]
ball_data: Optional[Detection]
players_data: Dict[Player, Detection]
other_data: Dict[str, Any]
ball_coordinates: Point3D
ball_speed: Optional[float] = None

@property
def record_id(self) -> int:
return self.frame_id

@property
def ball_coordinates(self):
if self.ball_data is None:
return None
return self.ball_data.coordinates

@property
def ball_speed(self):
if self.ball_data is None:
return None
return self.ball_data.speed

@property
def players_coordinates(self):
return {
Expand All @@ -52,6 +100,167 @@ def frames(self):
def frame_rate(self):
return self.metadata.frame_rate

@property
def trajectories(self):
trajectories = defaultdict(list)

# get ball trajectories
current_trajectory = None
for record in self.records:
if (
record.ball_data
and record.ball_data.coordinates
and record.ball_data.coordinates.x != float("nan")
):
if current_trajectory is None:
current_trajectory = Trajectory(
trackable_object="ball",
start_frame=record.frame_id,
end_frame=record.frame_id,
detections=[record.ball_data],
)
else:
current_trajectory.end_frame = record.frame_id
current_trajectory.detections.append(record.ball_data)
else:
if current_trajectory:
trajectories["ball"].append(current_trajectory)
current_trajectory = None
if current_trajectory:
trajectories["ball"].append(current_trajectory)

# get player trajectories
for team in self.metadata.teams:
for player in team.players:
current_trajectory = None
for record in self.records:
if (
player in record.players_data
and record.players_data[player].coordinates is not None
and record.players_data[player].coordinates.x
!= float("nan")
):
if current_trajectory is None:
current_trajectory = Trajectory(
trackable_object=player,
start_frame=record.frame_id,
end_frame=record.frame_id,
detections=[record.players_data[player]],
)
else:
current_trajectory.end_frame = record.frame_id
current_trajectory.detections.append(
record.players_data[player]
)
else:
if current_trajectory:
trajectories[player].append(current_trajectory)
current_trajectory = None
if current_trajectory:
trajectories[player].append(current_trajectory)

return trajectories

def compute_kinematics(
self,
n_smooth_speed: int = 6,
n_smooth_acc: int = 10,
max_speed_player: float = 12.0,
max_speed_ball: float = 50.0,
):
"""Compute speed and acceleration for each object in the dataset.
Args:
n_smooth_speed: The number of frames to smooth over when computing speed.
n_smooth_acc: The number of frames to smooth over when computing acceleration.
max_speed_player: The maximum speed allowed for a player (in m/s).
max_speed_ball: The maximum speed allowed for the ball (in m/s).
"""
if self.metadata.frame_rate is None:
raise KloppyError(
"Frame rate is not set in metadata. Please set the frame rate before computing kinematics."
)
for trackable_object, trajectories in self.trajectories.items():
max_speed = (
max_speed_player
if isinstance(trackable_object, Player)
else max_speed_ball
)

for trajectory in trajectories:
if len(trajectory) < n_smooth_speed + 1:
continue

# get x-y coordinates in metric space
tracked_maps = np.empty((len(trajectory), 2))
for i, detection in enumerate(trajectory):
point = detection.coordinates
metric_point = self.metadata.pitch_dimensions.to_base(
point
)
tracked_maps[i] = [metric_point.x, metric_point.y]

# apply a Savitzky-Golay filter for smoothing
tracked_maps = smoothing_savgol_3rd(tracked_maps)

# get speed vect and speed norm
dist = (
tracked_maps[n_smooth_speed:]
- tracked_maps[:-n_smooth_speed]
)
dist_norm = np.linalg.norm(dist, axis=1) / n_smooth_speed

speed_vect = dist / (n_smooth_speed / self.metadata.frame_rate)
speed_norm = np.linalg.norm(speed_vect, axis=1)

# acc process for short tracks
if speed_vect.shape[0] < self.metadata.frame_rate:
acc_vect = np.nan * np.ones_like(speed_vect)
acc_norm = np.nan * np.ones_like(speed_norm)
else:
# acc vect process for other tracks
diff_acc = (
speed_vect[n_smooth_acc:] - speed_vect[:-n_smooth_acc]
)
acc_vect = diff_acc / (
n_smooth_acc / self.metadata.frame_rate
)

# padding to respect the shape after the smoothing
add = np.zeros((n_smooth_acc // 2, 2)) + np.nan
acc_vect = np.concatenate((add, acc_vect, add))

# apply a physical check based on speed and acc
acc_vect = apply_criterion(speed_vect, acc_vect, max_speed)

# acc norm process for other tracks
diff_acc = (
speed_norm[n_smooth_acc:] - speed_norm[:-n_smooth_acc]
)
acc_norm = diff_acc / (
n_smooth_acc / self.metadata.frame_rate
)

# padding to respect the shape after the smoothing
add = np.zeros((n_smooth_acc // 2)) + np.nan
acc_norm = np.concatenate((add, acc_norm, add))

# apply last padding
add = np.zeros((n_smooth_speed // 2, 2)) + np.nan
speed_vect = np.concatenate((add, speed_vect, add))

add = np.zeros((n_smooth_speed // 2)) + np.nan
dist_norm = np.concatenate((add, dist_norm, add))
speed_norm = np.concatenate((add, speed_norm, add))
acc_norm = np.concatenate((add, acc_norm, add))

# fill detection dict with physical info
for i, detection in enumerate(trajectory):
detection.distance = dist_norm[i]
detection.speed = speed_norm[i]
detection.acceleration = acc_norm[i]

@deprecated(
"to_pandas will be removed in the future. Please use to_df instead."
)
Expand Down Expand Up @@ -94,4 +303,42 @@ def generic_record_converter(frame: Frame):
)


__all__ = ["Frame", "TrackingDataset", "PlayerData"]
def apply_criterion(speeds, acc, max_speed):
"""
Criterion used to spot tracking inaccuracies.
Args:
speeds (np.array): one player/ball speed (vx, vy) per row
acc (np.array): one player/ball acceleration (ax, ay) per row, same shape as speeds
max_speed (float): maximum speed allowed for a player or the ball
Returns:
acc: same as acc with value set to np.NaN if criterion <= 0
"""
criterion = -(9.1 / max_speed) * speeds + 9.1 - acc
mask = np.isnan(criterion)
criterion[mask] = -np.inf
mask_criterion = criterion <= 0.0
acc[mask_criterion] = np.nan
return acc


def smoothing_savgol_3rd(raw_maps):
"""
Smooth player/ball positions using a Savitzky-Golay filter.
Args:
raw_maps (np.array): one player/ball position (x, y) per row
Returns:
tracked_maps: smoothed player positions
"""
window_length = min(raw_maps.shape[0], 31)
if window_length % 2 == 0:
window_length = window_length - 1
polyorder = min(window_length - 1, 3)
tracked_maps = savgol_filter(raw_maps, window_length, polyorder, axis=0)
return tracked_maps


__all__ = ["Frame", "TrackingDataset", "Detection", "Trajectory"]
39 changes: 27 additions & 12 deletions kloppy/domain/services/transformers/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import fields, replace

from kloppy.domain.models.tracking import PlayerData
from kloppy.domain.models.tracking import Detection
from typing import Union, Optional

from kloppy.domain import (
Expand Down Expand Up @@ -204,17 +204,22 @@ def __change_frame_coordinate_system(self, frame: Frame):
ball_state=frame.ball_state,
period=frame.period,
# changes
ball_coordinates=self.__change_point_coordinate_system(
frame.ball_coordinates
),
ball_speed=frame.ball_speed,
ball_data=replace(
frame.ball_data,
coordinates=self.__change_point_coordinate_system(
frame.ball_data.coordinates
),
)
if frame.ball_data
else None,
players_data={
key: PlayerData(
key: Detection(
coordinates=self.__change_point_coordinate_system(
player_data.coordinates
),
distance=player_data.distance,
speed=player_data.speed,
acceleration=player_data.acceleration,
other_data=player_data.other_data,
)
for key, player_data in frame.players_data.items()
Expand All @@ -231,11 +236,16 @@ def __change_frame_dimensions(self, frame: Frame):
ball_state=frame.ball_state,
period=frame.period,
# changes
ball_coordinates=self.change_point_dimensions(
frame.ball_coordinates
),
ball_data=replace(
frame.ball_data,
coordinates=self.change_point_dimensions(
frame.ball_data.coordinates
),
)
if frame.ball_data
else None,
players_data={
key: PlayerData(
key: Detection(
coordinates=self.change_point_dimensions(
player_data.coordinates
),
Expand Down Expand Up @@ -285,7 +295,7 @@ def __change_point_coordinate_system(
def __flip_frame(self, frame: Frame):
players_data = {}
for player, data in frame.players_data.items():
players_data[player] = PlayerData(
players_data[player] = Detection(
coordinates=self.flip_point(data.coordinates),
distance=data.distance,
speed=data.speed,
Expand All @@ -300,7 +310,12 @@ def __flip_frame(self, frame: Frame):
ball_state=frame.ball_state,
period=frame.period,
# changes
ball_coordinates=self.flip_point(frame.ball_coordinates),
ball_data=replace(
frame.ball_data,
coordinates=self.flip_point(frame.ball_data.coordinates),
)
if frame.ball_data
else None,
players_data=players_data,
other_data=frame.other_data,
)
Expand Down
Loading

0 comments on commit 2983c65

Please sign in to comment.