Source code for towbintools.data_analysis.growth_rate

import numpy as np
from scipy.signal import medfilt
from scipy.signal import savgol_filter

from towbintools.data_analysis.time_series import correct_series_with_classification
from towbintools.data_analysis.time_series import smooth_series
from towbintools.data_analysis.time_series import smooth_series_classified
from towbintools.foundation.utils import interpolate_nans_infs


[docs] def compute_growth_rate_linear( series: np.ndarray, time: np.ndarray, ignore_start_fraction: float = 0.0, ignore_end_fraction: float = 0.0, savgol_filter_window: int = 5, savgol_filter_order: int = 3, ) -> float: """ Compute the growth rate of a time series using linear regression. Parameters: series (np.ndarray): The series of values to compute the growth rate of. time (np.ndarray): The time data corresponding to the series. ignore_start_fraction (float, optional): Fraction of points to ignore at the start. (default: 0.0) ignore_end_fraction (float, optional): Fraction of points to ignore at the end. (default: 0.0) savgol_filter_window (int, optional): Window size of the Savitzky-Golay filter. (default: 5) savgol_filter_order (int, optional): Polynomial order of the Savitzky-Golay filter. (default: 3) Returns: float: The linear growth rate of the time series. """ assert len(series) == len(time), "The series and time must have the same length." num_points = len(series) num_ignore_start = int(ignore_start_fraction * num_points) num_ignore_end = int(ignore_end_fraction * num_points) assert ( num_ignore_start + num_ignore_end < num_points ), "The fraction of points to ignore is too large." # Correctly handle slicing when ignore fractions are 0 if num_ignore_end == 0: time = time[num_ignore_start:] series = series[num_ignore_start:] else: time = time[num_ignore_start:-num_ignore_end] series = series[num_ignore_start:-num_ignore_end] # Remove extreme outliers with a small median filter series = medfilt(series, 3) series = interpolate_nans_infs(series) series = savgol_filter(series, savgol_filter_window, savgol_filter_order) # Compute linear regression slope, intercept = np.polyfit(time, series, 1) return slope
[docs] def compute_growth_rate_exponential( series: np.ndarray, time: np.ndarray, ignore_start_fraction: float = 0.0, ignore_end_fraction: float = 0.0, savgol_filter_window: int = 5, savgol_filter_order: int = 3, ) -> float: """ Compute the growth rate of a time series using exponential regression. Parameters: series (np.ndarray): The series of values to compute the growth rate of. time (np.ndarray): The time data corresponding to the series. ignore_start_fraction (float, optional): Fraction of points to ignore at the start. (default: 0.0) ignore_end_fraction (float, optional): Fraction of points to ignore at the end. (default: 0.0) savgol_filter_window (int, optional): Window size of the Savitzky-Golay filter. (default: 5) savgol_filter_order (int, optional): Polynomial order of the Savitzky-Golay filter. (default: 3) Returns: float: The exponential growth rate of the time series. """ assert len(series) == len(time), "The series and time must have the same length." num_points = len(series) num_ignore_start = int(ignore_start_fraction * num_points) num_ignore_end = int(ignore_end_fraction * num_points) assert ( num_ignore_start + num_ignore_end < num_points ), "The fraction of points to ignore is too large." # Correctly handle slicing when ignore fractions are 0 if num_ignore_end == 0: time = time[num_ignore_start:] series = series[num_ignore_start:] else: time = time[num_ignore_start:-num_ignore_end] series = series[num_ignore_start:-num_ignore_end] # Remove extreme outliers with a small median filter series = medfilt(series, 3) series = interpolate_nans_infs(series) series = savgol_filter(series, savgol_filter_window, savgol_filter_order) # Compute exponential regression slope, intercept = np.polyfit(time, np.log(series), 1) return slope
[docs] def compute_growth_rate_classified( series: np.ndarray, time: np.ndarray, qc: np.ndarray, method: str = "exponential", ignore_start_fraction: float = 0.0, ignore_end_fraction: float = 0.0, savgol_filter_window: int = 5, savgol_filter_order: int = 3, ) -> float: """ Compute the growth rate of a time series after correcting non-worm timepoints. Non-worm points (classified as ``"egg"`` or ``"error"``) are removed and interpolated before fitting. Parameters: series (np.ndarray): The time series of values. time (np.ndarray): The time data corresponding to the series. qc (np.ndarray): Per-point classification (``"worm"``, ``"egg"``, or ``"error"``). method (str, optional): Regression method; ``"exponential"`` or ``"linear"``. (default: ``"exponential"``) ignore_start_fraction (float, optional): Fraction of points to ignore at the start. (default: 0.0) ignore_end_fraction (float, optional): Fraction of points to ignore at the end. (default: 0.0) savgol_filter_window (int, optional): Window size of the Savitzky-Golay filter. (default: 5) savgol_filter_order (int, optional): Polynomial order of the Savitzky-Golay filter. (default: 3) Returns: float: The growth rate of the time series. """ assert ( len(series) == len(time) == len(qc) ), "The series, time, and qc must have the same length." series_worms = correct_series_with_classification(series, qc) if method == "exponential": growth_rate = compute_growth_rate_exponential( series_worms, time, ignore_start_fraction, ignore_end_fraction, savgol_filter_window, savgol_filter_order, ) elif method == "linear": growth_rate = compute_growth_rate_linear( series_worms, time, ignore_start_fraction, ignore_end_fraction, savgol_filter_window, savgol_filter_order, ) return growth_rate
[docs] def compute_instantaneous_growth_rate( series: np.ndarray, time: np.ndarray, lmbda: float = 0.0075, order: int = 2, medfilt_window: int = 5, ) -> np.ndarray: """ Compute the instantaneous growth rate of a time series. Parameters: series (np.ndarray): The series of values to compute the growth rate of. time (np.ndarray): The time data corresponding to the series. lmbda (float, optional): The smoothing parameter for the Whittaker-Eilers smoothing. Default provides good results for our volume curves when series_time is in hours. (default: 0.0075) order (int, optional): The order of the penalty of the Whittaker-Eilers smoother. (default: 2) medfilt_window (int, optional): The window size for the median filter. (default: 5) Returns: np.ndarray: The instantaneous growth rate of the time series. """ assert len(series) == len(time), "The series and time must have the same length." series = smooth_series(series, time, lmbda, order, medfilt_window) # Compute the instantaneous growth rate growth_rate = np.gradient(series, time) return growth_rate
[docs] def compute_instantaneous_growth_rate_classified( series: np.ndarray, time: np.ndarray, qc: np.ndarray, lmbda: float = 0.0075, order: int = 2, medfilt_window: int = 5, ) -> np.ndarray: """ Compute the instantaneous growth rate of a time series after correcting the non-worm points by removing them and interpolating them back. Parameters: series (np.ndarray): The time series of values. time (np.ndarray): The time data corresponding to the series. qc (np.ndarray): The classification of the points as either 'worm' or 'egg' or 'error'. lmbda (float, optional): The smoothing parameter for the Whittaker-Eilers smoothing. Default provides good results for our volume curves when series_time is in hours. (default: 0.0075) order (int, optional): The order of the penalty of the Whittaker-Eilers smoother. (default: 2) medfilt_window (int, optional): The window size for the median filter. (default: 5) Returns: np.ndarray: The instantaneous growth rate of the time series. """ assert ( len(series) == len(time) == len(qc) ), "The series, time, and qc must have the same length." # Correct and smooth the time series series = smooth_series_classified(series, time, qc, lmbda, order, medfilt_window) # Compute the instantaneous growth rate growth_rate = np.gradient(series, time) return growth_rate
[docs] def compute_growth_rate_per_larval_stage( series: np.ndarray, time: np.ndarray, qc: np.ndarray, ecdysis: dict, method: str = "exponential", ignore_start_fraction: float = 0.0, ignore_end_fraction: float = 0.0, savgol_filter_window: int = 5, savgol_filter_order: int = 3, ) -> dict: """ Compute the growth rate of a time series for each larval stage. Non-worm timepoints are removed and interpolated before fitting. Growth rates are computed for L1–L4 using the ecdysis boundary indices. Parameters: series (np.ndarray): The time series of values. time (np.ndarray): The time data corresponding to the series. qc (np.ndarray): Per-point classification (``"worm"``, ``"egg"``, or ``"error"``). ecdysis (dict): Ecdysis boundary dict with keys ``"HatchTime"``, ``"M1"``, ``"M2"``, ``"M3"``, ``"M4"`` as integer indices. method (str, optional): Regression method; ``"exponential"`` or ``"linear"``. (default: ``"exponential"``) ignore_start_fraction (float, optional): Fraction of points to ignore at the start of each stage. (default: 0.0) ignore_end_fraction (float, optional): Fraction of points to ignore at the end of each stage. (default: 0.0) savgol_filter_window (int, optional): Window size of the Savitzky-Golay filter. (default: 5) savgol_filter_order (int, optional): Polynomial order of the Savitzky-Golay filter. (default: 3) Returns: dict: Growth rate for each larval stage, keyed ``"L1"``–``"L4"``. Stages with NaN ecdysis boundaries return ``np.nan``. """ assert ( len(series) == len(time) == len(qc) ), "The series, time, and qc, must have the same length." # Correct the time series series_worms = correct_series_with_classification(series, qc) # extract ecdisis indices hatch_time = ecdysis["HatchTime"] M1 = ecdysis["M1"] M2 = ecdysis["M2"] M3 = ecdysis["M3"] M4 = ecdysis["M4"] growth_rates = {} # Compute the growth rate per larval stage for i, (start, end) in enumerate(zip([hatch_time, M1, M2, M3], [M1, M2, M3, M4])): # check if start or end is NaN if np.isnan(start) or np.isnan(end): growth_rates[f"L{i+1}"] = np.nan else: series_worms_stage = series_worms[start:end] time_stage = time[start:end] qc_stage = qc[start:end] growth_rate_stage = compute_growth_rate_classified( series_worms_stage, time_stage, qc_stage, method, ignore_start_fraction, ignore_end_fraction, savgol_filter_window, savgol_filter_order, ) growth_rates[f"L{i+1}"] = growth_rate_stage return growth_rates
[docs] def compute_larval_stage_duration(ecdysis: dict) -> dict: """ Compute the duration of each larval stage. Parameters: ecdysis (dict): The ecdysis events of the worm. Returns: dict: The duration of each larval stage. """ # extract ecdysis indices hatch_time = ecdysis["HatchTime"] M1 = ecdysis["M1"] M2 = ecdysis["M2"] M3 = ecdysis["M3"] M4 = ecdysis["M4"] ls_durations = {} for i, (start, end) in enumerate(zip([hatch_time, M1, M2, M3], [M1, M2, M3, M4])): # check if start or end is NaN if np.isnan(start) or np.isnan(end): ls_durations[f"L{i+1}"] = np.nan else: ls_durations[f"L{i+1}"] = end - start return ls_durations