Skip to content

Commit

Permalink
Merge pull request #3 from vreuter/vr/pandas-table-like
Browse files Browse the repository at this point in the history
Support pandas-like table with headers, and new `looptrace` (v0.5.0) output format
  • Loading branch information
vreuter authored May 30, 2024
2 parents 473c036 + 7457a79 commit 71f4b2b
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 243 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v0.2.0] - 2024-05-30

### Changed
* This project now can use `pandas` and parses a table-like file (CSV) _with_ header, to support upstream changes in data generation by `looptrace`.
* Splitting old functionality out into separate modules

## [v0.1.0] - 2024-04-20

### Added
Expand Down
2 changes: 1 addition & 1 deletion looptrace_loci_vis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Napari plugin for visualising locus-specific points from looptrace"""

__version__ = "0.1.0"
__version__ = "0.2.0"
9 changes: 9 additions & 0 deletions looptrace_loci_vis/_const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Plugin-wide constants"""

from enum import Enum


class PointColor(Enum):
# See: https://davidmathlogic.com/colorblind/
DEEP_SKY_BLUE = "#0C7BDC"
GOLDENROD = "#FFC20A"
18 changes: 18 additions & 0 deletions looptrace_loci_vis/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Type aliases used broadly"""

from collections.abc import Callable
from pathlib import Path
from typing import Literal, Union

from gertils.geometry import ZCoordinate
from gertils.types import PixelArray

CsvRow = list[str]
FlatPointRecord = list[Union[float, ZCoordinate]]
LayerParams = dict[str, object]
ImageLayer = tuple[PixelArray, LayerParams, Literal["image"]]
PointsLayer = tuple[list[FlatPointRecord], LayerParams, Literal["points"]]
PathLike = str | Path
PathOrPaths = PathLike | list[PathLike]
QCFailReasons = str
Reader = Callable[[PathLike], list[ImageLayer | PointsLayer]]
118 changes: 118 additions & 0 deletions looptrace_loci_vis/point_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""A single point's record in a file on disk."""

import dataclasses
from math import floor
from typing import Union

import numpy as np
from gertils.geometry import ImagePoint3D, LocatableXY, LocatableZ, ZCoordinate
from gertils.types import TimepointFrom0 as Timepoint
from gertils.types import TraceIdFrom0 as TraceId
from numpydoc_decorator import doc # type: ignore[import-untyped]

from ._types import FlatPointRecord


@doc(
summary="",
parameters=dict(
trace_id="ID of the trace with which the locus spot is associated",
timepoint="Imaging timepoint in from which the point is coming",
point="Coordinates of the centroid of the Gaussian fit to the spot image pixel data",
),
)
@dataclasses.dataclass(frozen=True, kw_only=True)
class PointRecord(LocatableXY, LocatableZ): # noqa: D101
trace_id: TraceId
timepoint: Timepoint
point: ImagePoint3D

def __post_init__(self) -> None:
bads: dict[str, object] = {}
if not isinstance(self.trace_id, TraceId):
bads["trace ID"] = self.trace_id # type: ignore[unreachable]
if not isinstance(self.timepoint, Timepoint):
bads["timepoint"] = self.timepoint # type: ignore[unreachable]
if not isinstance(self.point, ImagePoint3D):
bads["point"] = self.point # type: ignore[unreachable]
if bads:
messages = "; ".join(f"Bad type ({type(v).__name__}) for {k}" for k, v in bads.items())
raise TypeError(f"Cannot create point record: {messages}")

@doc(summary="Flatten")
def flatten(self) -> FlatPointRecord:
"""Create a simple list of components, as a row of layer data."""
return [
self.trace_id.get,
self.timepoint.get,
self.get_z_coordinate(),
self.get_y_coordinate(),
self.get_x_coordinate(),
]

def get_x_coordinate(self) -> float: # noqa: D102
return self.point.x

def get_y_coordinate(self) -> float: # noqa: D102
return self.point.y

def get_z_coordinate(self) -> ZCoordinate: # noqa: D102
return self.point.z

@doc(summary="Round point position to nearest z-slice")
def with_truncated_z(self) -> "PointRecord": # noqa: D102
new_z: int = floor(self.get_z_coordinate())
result: PointRecord = self.with_new_z(new_z)
return result

@doc(
summary="Replace this instance's point with a copy with updated z.",
parameters=dict(z="New z-coordinate value"),
)
def with_new_z(self, z: int) -> "PointRecord": # noqa: D102
pt = ImagePoint3D(x=self.point.x, y=self.point.y, z=z)
return dataclasses.replace(self, point=pt)


@doc(
summary="Create ancillary points from main point",
parameters=dict(
r="The record to expand along z-axis",
z_max="The maximum z-coordinate",
),
returns="""
List of layer data rows to represent the original point along
entire length of z-axis, paired with flag for each row
indicating whether it's true center or not
""",
)
def expand_along_z( # noqa: D103
r: PointRecord, *, z_max: Union[float, np.float64]
) -> tuple[list[PointRecord], list[bool]]:
if not isinstance(z_max, int | float | np.float64):
raise TypeError(f"Bad type for z_max: {type(z_max).__name__}")

r = r.with_truncated_z()
z_center = int(r.get_z_coordinate())
z_max = int(floor(z_max))
if not isinstance(z_center, int) or not isinstance(z_max, int):
raise TypeError(
f"Z center and Z max must be int; got {type(z_center).__name__} and"
f" {type(z_max).__name__}"
)

# Check that max z and center z make sense together.
if z_max < z_center:
raise ValueError(f"Max z must be at least as great as central z ({z_center})")

# Build the records and flags of where the center in z really is.
predecessors = [(r.with_new_z(i), False) for i in range(z_center)]
successors = [(r.with_new_z(i), False) for i in range(z_center + 1, z_max + 1)]
points, params = zip(*[*predecessors, (r, True), *successors], strict=False)

# Each record should give rise to a total of 1 + z_max records, since numbering from 0.
if len(points) != 1 + z_max:
raise RuntimeError(
f"Number of points generated from single spot center isn't as expected! Point={r}, z_max={z_max}, len(points)={len(points)}"
)
return points, params # type: ignore[return-value]
144 changes: 144 additions & 0 deletions looptrace_loci_vis/points_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Abstractions related to points parsing"""

import abc
import csv
from collections.abc import Iterable, Sized
from enum import Enum
from typing import Generic, Protocol, TypeVar

import pandas as pd
from gertils.geometry import ImagePoint3D
from gertils.types import TimepointFrom0 as Timepoint
from gertils.types import TraceIdFrom0 as TraceId

from ._types import CsvRow, PathLike, QCFailReasons
from .point_record import PointRecord

Input = TypeVar("Input", contravariant=True)
I1 = TypeVar("I1")
I2 = TypeVar("I2", bound=Sized)


class MappingLike(Protocol, Sized): # noqa: D101
@abc.abstractmethod
def __getitem__(self, key: str) -> object: ...

@abc.abstractmethod
def __len__(self) -> int: ...


class PointsParser(Protocol, Generic[Input]):
"""Something capable of parsing a QC-pass or -fail CSV file"""

@classmethod
@abc.abstractmethod
def parse_all_qcpass(cls, data: Input) -> list[PointRecord]: ... # noqa: D102

@classmethod
@abc.abstractmethod
def parse_all_qcfail(cls, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... # noqa: D102


class IterativePointsParser(Generic[I1, I2], PointsParser[I1]):
"""Something that yields records, each of type I2 from value of type I1, to parse QC-pass/-fail points"""

@classmethod
@abc.abstractmethod
def _gen_records(cls, data: I1) -> Iterable[I2]: ...

@classmethod
@abc.abstractmethod
def _parse_single_qcpass_record(cls, record: I2) -> PointRecord: ...

@classmethod
@abc.abstractmethod
def _parse_single_qcfail_record(cls, record: I2) -> tuple[PointRecord, QCFailReasons]: ...

@classmethod
def parse_all_qcpass(cls, data: I1) -> list[PointRecord]: # noqa: D102
return [cls._parse_single_qcpass_record(r) for r in cls._gen_records(data)]

@classmethod
def parse_all_qcfail(cls, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D102
return [cls._parse_single_qcfail_record(r) for r in cls._gen_records(data)]


class HeadedTraceTimePointParser(IterativePointsParser[PathLike, MappingLike]):
"""Something capable of parsing a headed CSV of QC-pass/-fail points records"""

TIME_INDEX_COLUMN = "timeIndex"

@classmethod
def _gen_records(cls, data: PathLike) -> Iterable[MappingLike]:
for _, row in pd.read_csv(data).iterrows():
yield row

@classmethod
def _parse_single_qcpass_record(cls, record: MappingLike) -> PointRecord:
trace = TraceId(int(record["traceIndex"])) # type: ignore[call-overload]
timepoint = Timepoint(int(record[cls.TIME_INDEX_COLUMN])) # type: ignore[call-overload]
z = float(record["z"]) # type: ignore[arg-type]
y = float(record["y"]) # type: ignore[arg-type]
x = float(record["x"]) # type: ignore[arg-type]
point = ImagePoint3D(z=z, y=y, x=x)
return PointRecord(trace_id=trace, timepoint=timepoint, point=point)

@classmethod
def _parse_single_qcfail_record(cls, record: MappingLike) -> tuple[PointRecord, QCFailReasons]:
"""A fail record parses the same as a pass one, just with one additional field for QC fail reasons."""
pt_rec = cls._parse_single_qcpass_record(record)
fail_code = record["failCode"]
if not isinstance(fail_code, str):
raise TypeError(f"failCode is not str, but {type(fail_code).__name__}")
fail_code: str = str(fail_code) # type: ignore[no-redef]
return pt_rec, fail_code


class HeadlessTraceTimePointParser(IterativePointsParser[PathLike, CsvRow]):
"""Parser for input file with no header, and field for trace ID and timepoint in addition to coordinates"""

class InputFileColumn(Enum):
"""Indices of the different columns to parse as particular fields"""

TRACE = 0
TIMEPOINT = 1
Z = 2
Y = 3
X = 4
QC = 5

@property
def get(self) -> int:
"""Alias for the value of this enum member"""
return self.value

_number_of_columns = sum(1 for _ in InputFileColumn)

@classmethod
def _parse_single_record(cls, r: CsvRow, *, exp_len: int) -> PointRecord:
if not isinstance(r, list):
raise TypeError(f"Record to parse must be list, not {type(r).__name__}")
if len(r) != exp_len:
raise ValueError(f"Expected record of length {exp_len} but got {len(r)}: {r}")
trace = TraceId(int(r[cls.InputFileColumn.TRACE.get]))
timepoint = Timepoint(int(r[cls.InputFileColumn.TIMEPOINT.get]))
z = float(r[cls.InputFileColumn.Z.get])
y = float(r[cls.InputFileColumn.Y.get])
x = float(r[cls.InputFileColumn.X.get])
point = ImagePoint3D(z=z, y=y, x=x)
return PointRecord(trace_id=trace, timepoint=timepoint, point=point)

@classmethod
def _gen_records(cls, data: PathLike) -> Iterable[CsvRow]:
with open(data, newline="") as fh: # noqa: PTH123
return list(csv.reader(fh))

@classmethod
def _parse_single_qcpass_record(cls, record: CsvRow) -> PointRecord:
return cls._parse_single_record(record, exp_len=cls._number_of_columns - 1)

@classmethod
def _parse_single_qcfail_record(cls, record: CsvRow) -> tuple[PointRecord, QCFailReasons]:
pt_rec = cls._parse_single_record(record, exp_len=cls._number_of_columns)
fail_code = record[cls.InputFileColumn.QC.get]
return pt_rec, fail_code
Loading

0 comments on commit 71f4b2b

Please sign in to comment.