import bottleneck as bn
import numpy as np
from towbintools.data_analysis import rescale_series
from towbintools.data_analysis.growth_rate import (
compute_instantaneous_growth_rate_classified,
)
from towbintools.data_analysis.time_series import (
smooth_series_classified,
)
from towbintools.foundation.utils import find_best_string_match
[docs]
def combine_series(
conditions_struct: list,
series_one: str,
series_two: str,
operation: str,
new_series_name: str,
) -> list:
"""
Combine two series in every condition dict via an elementwise arithmetic operation.
Parameters:
conditions_struct (list) : List of condition dicts.
series_one (str) : Key of the first operand series.
series_two (str) : Key of the second operand series.
operation (str) : Arithmetic operation to apply.
Supported values: ``"add"``, ``"subtract"``, ``"multiply"``, ``"divide"``.
Division adds a small epsilon (1e-8) to the denominator to avoid division by zero.
new_series_name (str) : Key under which the result is stored in each condition dict.
Returns:
list : The modified ``conditions_struct`` with the new series added in place.
"""
for condition in conditions_struct:
series_one_values = condition[series_one]
series_two_values = condition[series_two]
if operation == "add":
new_series_values = np.add(series_one_values, series_two_values)
elif operation == "subtract":
new_series_values = series_one_values - series_two_values
elif operation == "multiply":
new_series_values = series_one_values * series_two_values
elif operation == "divide":
new_series_values = np.divide(series_one_values, series_two_values + 1e-8)
condition[new_series_name] = new_series_values
return conditions_struct
[docs]
def compute_growth_rate(
conditions_struct: list,
series_name: str,
gr_series_name: str,
experiment_time: bool = True,
lmbda: float = 0.0075,
order: int = 2,
medfilt_window: int = 5,
) -> list:
"""
Compute the instantaneous growth rate for each worm in every condition.
The quality-control column is detected automatically by matching ``series_name``
against all keys containing ``"qc"``.
Parameters:
conditions_struct (list) : List of condition dicts.
series_name (str) : Key of the measurement series (e.g. body length).
gr_series_name (str) : Key under which the growth-rate series is stored.
experiment_time (bool) : If ``True``, use ``"experiment_time_hours"``; otherwise
use ``"time"``. Defaults to ``True``.
lmbda (float) : Regularisation parameter for the Whittaker smoother used
internally. Defaults to ``0.0075``.
order (int) : Polynomial order for smoothing. Defaults to ``2``.
medfilt_window (int) : Median-filter window size applied before differentiation.
Defaults to ``5``.
Returns:
list : The modified ``conditions_struct`` with the growth-rate series added in place.
"""
for condition in conditions_struct:
series_values = condition[series_name]
qc_keys = [key for key in condition.keys() if "qc" in key]
if len(qc_keys) == 1:
qc_key = qc_keys[0]
else:
qc_key = find_best_string_match(series_name, qc_keys)
qc = condition[qc_key]
if experiment_time:
time = condition["experiment_time_hours"]
else:
time = condition["time"]
growth_rate = []
for i in range(series_values.shape[0]):
gr = compute_instantaneous_growth_rate_classified(
series_values[i],
time[i],
qc[i],
lmbda=lmbda,
order=order,
medfilt_window=medfilt_window,
)
growth_rate.append(gr)
growth_rate = np.array(growth_rate)
condition[gr_series_name] = growth_rate
return conditions_struct
[docs]
def rescale(
conditions_struct: list,
series_name: str,
rescaled_series_name: str,
experiment_time: bool = True,
n_points: int = 100,
) -> list:
"""
Rescale a series to a fixed number of points per larval stage and flatten the stage axis.
Each worm's series is resampled to ``n_points`` per larval stage via
``rescale_series``, yielding shape ``(n_worms, 4, n_points)``, which is then
reshaped to ``(n_worms, 4 * n_points)``.
Parameters:
conditions_struct (list) : List of condition dicts.
series_name (str) : Key of the raw series to rescale.
rescaled_series_name (str) : Key under which the rescaled series is stored.
experiment_time (bool) : If ``True``, use ``"experiment_time"``; otherwise
use ``"time"``. Defaults to ``True``.
n_points (int) : Number of resampled points per larval stage. Defaults to ``100``.
Returns:
list : The modified ``conditions_struct`` with the rescaled series added in place.
"""
for condition in conditions_struct:
series_values = condition[series_name]
qc_keys = [key for key in condition.keys() if "qc" in key]
if len(qc_keys) == 1:
qc_key = qc_keys[0]
else:
qc_key = find_best_string_match(series_name, qc_keys)
qc = condition[qc_key]
ecdysis = condition["ecdysis_index"]
if experiment_time:
time = condition["experiment_time"]
else:
time = condition["time"]
_, rescaled_series = rescale_series(
series_values, time, ecdysis, qc, n_points=n_points
) # shape (n_worms, 4, n_points)
# reshape into (n_worms, 4*n_points)
rescaled_series = rescaled_series.reshape(rescaled_series.shape[0], -1)
condition[rescaled_series_name] = rescaled_series
return conditions_struct
[docs]
def rescale_without_flattening(
conditions_struct: list,
series_name: str,
rescaled_series_name: str,
experiment_time: bool = True,
n_points: int = 100,
) -> list:
"""
Rescale a series to a fixed number of points per larval stage, retaining the stage axis.
Like ``rescale``, but the output shape is ``(n_worms, 4, n_points)`` rather than
being flattened. Useful when per-stage statistics are required downstream.
Parameters:
conditions_struct (list) : List of condition dicts.
series_name (str) : Key of the raw series to rescale.
rescaled_series_name (str) : Key under which the rescaled series is stored.
experiment_time (bool) : If ``True``, use ``"experiment_time"``; otherwise
use ``"time"``. Defaults to ``True``.
n_points (int) : Number of resampled points per larval stage. Defaults to ``100``.
Returns:
list : The modified ``conditions_struct`` with the rescaled series added in place.
"""
for condition in conditions_struct:
series_values = condition[series_name]
qc_keys = [key for key in condition.keys() if "qc" in key]
if len(qc_keys) == 1:
qc_key = qc_keys[0]
else:
qc_key = find_best_string_match(series_name, qc_keys)
qc = condition[qc_key]
ecdysis = condition["ecdysis_index"]
if experiment_time:
time = condition["experiment_time"]
else:
time = condition["time"]
_, rescaled_series = rescale_series(
series_values, time, ecdysis, qc, n_points=n_points
) # shape (n_worms, 4, n_points)
condition[rescaled_series_name] = rescaled_series
return conditions_struct
[docs]
def exclude_arrests_from_series_at_ecdysis(series_at_ecdysis: np.ndarray) -> np.ndarray:
"""
Remove arrested worms from a per-molt measurement array.
A worm is considered arrested at molt *i* if its value at molt *i+1* is NaN.
The value at that molt is set to NaN in the output. The last molt event is
always kept regardless.
Parameters:
series_at_ecdysis (np.ndarray) : Per-worm, per-molt values.
Shape ``(n_molts,)`` for a single worm or ``(n_worms, n_molts)`` for
multiple worms.
Returns:
np.ndarray : Copy of ``series_at_ecdysis`` with arrested molt values replaced by NaN.
"""
filtered_series = np.full(series_at_ecdysis.shape, np.nan)
# keep only a value at one ecdys event if the next one is not nan
if series_at_ecdysis.shape[0] == 1 or len(series_at_ecdysis.shape) == 1:
for i in range(len(series_at_ecdysis)):
if i == len(series_at_ecdysis) - 1:
filtered_series[i] = series_at_ecdysis[i]
elif not np.isnan(series_at_ecdysis[i + 1]):
filtered_series[i] = series_at_ecdysis[i]
return filtered_series
else:
for i in range(series_at_ecdysis.shape[0]):
for j in range(series_at_ecdysis.shape[1]):
if j == series_at_ecdysis.shape[1] - 1:
filtered_series[i, j] = series_at_ecdysis[i, j]
elif not np.isnan(series_at_ecdysis[i, j + 1]):
filtered_series[i, j] = series_at_ecdysis[i, j]
return filtered_series
[docs]
def smooth_series(
conditions_struct: list,
series_name: str,
smoothed_series_name: str,
experiment_time: bool = True,
lmbda: float = 0.0075,
order: int = 2,
medfilt_window: int = 5,
) -> list:
"""
Smooth a measurement series for each worm in every condition using a classified smoother.
Short smoothed series are right-padded with NaN to match the original length.
Parameters:
conditions_struct (list) : List of condition dicts.
series_name (str) : Key of the raw series to smooth.
smoothed_series_name (str) : Key under which the smoothed series is stored.
experiment_time (bool) : If ``True``, use ``"experiment_time_hours"``; otherwise
use ``"time"``. Defaults to ``True``.
lmbda (float) : Regularisation parameter for the Whittaker smoother. Defaults to ``0.0075``.
order (int) : Polynomial order for smoothing. Defaults to ``2``.
medfilt_window (int) : Median-filter window size applied before the smoother.
Defaults to ``5``.
Returns:
list : The modified ``conditions_struct`` with the smoothed series added in place.
"""
for condition in conditions_struct:
series_values = condition[series_name]
qc_keys = [key for key in condition.keys() if "qc" in key]
if len(qc_keys) == 1:
qc_key = qc_keys[0]
else:
qc_key = find_best_string_match(series_name, qc_keys)
qc = condition[qc_key]
if experiment_time:
time = condition["experiment_time_hours"]
else:
time = condition["time"]
smoothed_series = []
for i in range(len(series_values)):
values = series_values[i]
smoothed = smooth_series_classified(
values,
time[i],
qc[i],
lmbda=lmbda,
order=order,
medfilt_window=medfilt_window,
)
# pad with 0 until it's the same length as the original series
if len(smoothed) < len(values):
smoothed = np.pad(
smoothed,
(0, len(values) - len(smoothed)),
"constant",
constant_values=(np.nan, np.nan),
)
smoothed_series.append(smoothed)
smoothed_series = np.array(smoothed_series)
condition[smoothed_series_name] = smoothed_series
return conditions_struct
[docs]
def smooth_and_rescale_series(
conditions_struct: list,
series_name: str,
smoothed_series_name: str,
experiment_time: bool = True,
lmbda: float = 0.0075,
order: int = 2,
medfilt_window: int = 5,
n_points: int = 100,
flatten: bool = True,
) -> list:
"""
Smooth a series and rescale it to a fixed number of points per larval stage.
After smoothing, the QC classification is discarded (all frames are treated as
``"worm"``) before resampling. Two new keys are written to each condition dict:
``smoothed_series_name`` (the smoothed but not rescaled series) and
``f"{smoothed_series_name}_rescaled"`` (the rescaled result).
Parameters:
conditions_struct (list) : List of condition dicts.
series_name (str) : Key of the raw series.
smoothed_series_name (str) : Base key for the output series.
experiment_time (bool) : If ``True``, use ``"experiment_time_hours"``; otherwise
use ``"time"``. Defaults to ``True``.
lmbda (float) : Regularisation parameter for the Whittaker smoother. Defaults to ``0.0075``.
order (int) : Polynomial order for smoothing. Defaults to ``2``.
medfilt_window (int) : Median-filter window size applied before the smoother.
Defaults to ``5``.
n_points (int) : Number of resampled points per larval stage. Defaults to ``100``.
flatten (bool) : If ``True``, reshape the rescaled array from
``(n_worms, 4, n_points)`` to ``(n_worms, 4 * n_points)``.
Defaults to ``True``.
Returns:
list : The modified ``conditions_struct`` with the smoothed and rescaled series added in place.
"""
for condition in conditions_struct:
series_values = condition[series_name]
qc_keys = [key for key in condition.keys() if "qc" in key]
if len(qc_keys) == 1:
qc_key = qc_keys[0]
else:
qc_key = find_best_string_match(series_name, qc_keys)
qc = condition[qc_key]
ecdysis = condition["ecdysis_index"]
if experiment_time:
time = condition["experiment_time_hours"]
else:
time = condition["time"]
smoothed_series = []
for i in range(len(series_values)):
values = series_values[i]
smoothed = smooth_series_classified(
values,
time[i],
qc[i],
lmbda=lmbda,
order=order,
medfilt_window=medfilt_window,
)
# pad with 0 until it's the same length as the original series
if len(smoothed) < len(values):
smoothed = np.pad(
smoothed,
(0, len(values) - len(smoothed)),
"constant",
constant_values=(np.nan, np.nan),
)
smoothed_series.append(smoothed)
smoothed_series = np.array(smoothed_series)
# we don't need the classification anymore
qc = np.full_like(smoothed_series, "worm", dtype=object)
# rescale the smoothed series
_, rescaled_series = rescale_series(
smoothed_series, time, ecdysis, qc, n_points=n_points
) # shape (n_worms, 4, n_points)
# reshape into (n_worms, 4*n_points)
if flatten:
rescaled_series = rescaled_series.reshape(rescaled_series.shape[0], -1)
condition[smoothed_series_name] = smoothed_series
condition[f"{smoothed_series_name}_rescaled"] = rescaled_series
return conditions_struct
[docs]
def detrend_rescaled_series_population_mean(
conditions_struct: list,
rescaled_series: str,
) -> list:
"""
Remove the population mean trend from a rescaled series in every condition.
The population mean is computed across worms within each condition independently.
The detrended result is stored under ``f"{rescaled_series}_detrended"``.
Parameters:
conditions_struct (list) : List of condition dicts.
rescaled_series (str) : Key of the rescaled series to detrend.
Returns:
list : The modified ``conditions_struct`` with the detrended series added in place.
"""
for condition in conditions_struct:
series = condition[rescaled_series]
detrended_series = _detrend_rescaled_series_population_mean(series)
new_name = f"{rescaled_series}_detrended"
condition[new_name] = detrended_series
return conditions_struct
def _detrend_rescaled_series_population_mean(series: np.ndarray) -> np.ndarray:
"""
Subtract the column-wise population mean from a 2-D series array.
Parameters:
series (np.ndarray) : Array of shape ``(n_worms, n_points)``.
Returns:
np.ndarray : Detrended array of the same shape.
"""
# detrend the series by subtracting the population mean
population_mean = bn.nanmean(series, axis=0)
detrended_series = series - population_mean
return detrended_series