import re
from difflib import SequenceMatcher
from typing import Callable
import numpy as np
[docs]
def nan_helper(
y: np.ndarray,
) -> tuple[np.ndarray, Callable]:
"""
Return logical indices of NaNs and a conversion function for use with np.interp.
Parameters:
y (np.ndarray): 1D array with possible NaN values.
Returns:
tuple: ``(nans, index)`` where ``nans`` is a boolean array marking NaN
positions and ``index`` is a callable that converts a boolean index
array to integer indices (e.g. ``index(nans)`` returns positions of NaNs).
Example:
>>> nans, x = nan_helper(y)
>>> y[nans] = np.interp(x(nans), x(~nans), y[~nans])
"""
return np.isnan(y), lambda z: z.nonzero()[0]
[docs]
def inf_helper(
y: np.ndarray,
) -> tuple[np.ndarray, Callable]:
"""
Return logical indices of infinities and a conversion function for use with np.interp.
Parameters:
y (np.ndarray): 1D array with possible infinity values.
Returns:
tuple: ``(infs, index)`` where ``infs`` is a boolean array marking infinity
positions and ``index`` is a callable that converts a boolean index
array to integer indices (e.g. ``index(infs)`` returns positions of infinities).
Example:
>>> infs, x = inf_helper(y)
>>> y[infs] = np.interp(x(infs), x(~infs), y[~infs])
"""
return np.isinf(y), lambda z: z.nonzero()[0]
[docs]
def interpolate_nans(
signal: np.ndarray,
) -> np.ndarray:
"""
Interpolate NaN values in a given signal.
Uses linear interpolation to estimate and replace NaN values in the provided
signal based on the values of non-NaN neighbors.
Parameters:
signal (np.ndarray): The input signal array, which might contain NaN values.
Returns:
np.ndarray: The signal array with NaN values interpolated.
"""
nans, x = nan_helper(signal)
try:
signal[nans] = np.interp(x(nans), x(~nans), signal[~nans])
except ValueError:
# if all values are NaN, we cannot interpolate
signal = np.full_like(signal, np.nan)
return signal
[docs]
def interpolate_infs(
signal: np.ndarray,
) -> np.ndarray:
"""
Interpolate infinity values in a given signal.
Uses linear interpolation to estimate and replace infinity values in the provided
signal based on the values of non-infinity neighbors.
Parameters:
signal (np.ndarray): The input signal array, which might contain infinity values.
Returns:
np.ndarray: The signal array with infinity values interpolated.
"""
infs, x = inf_helper(signal)
try:
signal[infs] = np.interp(x(infs), x(~infs), signal[~infs])
except ValueError:
# if all values are inf, we cannot interpolate
signal = np.full_like(signal, np.nan)
return signal
[docs]
def interpolate_nans_infs(
signal: np.ndarray,
) -> np.ndarray:
"""
Interpolate NaN and infinity values in a given signal.
Uses linear interpolation to estimate and replace NaN and infinity values in the provided
signal based on the values of non-NaN and non-infinity neighbors.
Parameters:
signal (np.ndarray): The input signal array, which might contain NaN and infinity values.
Returns:
np.ndarray: The signal array with NaN and infinity values interpolated.
"""
signal = interpolate_nans(signal)
signal = interpolate_infs(signal)
return signal
# Exception class for the case when a method is not implemented
[docs]
class NotImplementedError(Exception):
pass
def _extract_column_components(col_name: str) -> list[str]:
"""
Split a column name into lowercase components on ``_``, ``-``, and ``.`` separators.
Parameters:
col_name (str): The column name to split.
Returns:
list[str]: List of lowercase string components.
"""
parts = re.split(r"[_\-\.]", col_name.lower())
return parts
[docs]
def find_best_string_match(reference: str, candidates: list[str]) -> str:
"""
Find the best matching candidate string to a reference using component and string similarity.
Scores each candidate by a weighted combination of component overlap (60 %) and
sequence similarity (40 %), returning the candidate with the highest score.
Useful for matching QC column names to data column names.
Parameters:
reference (str): The reference string to match against.
candidates (list[str]): List of candidate strings to evaluate.
Returns:
str: The best matching candidate string.
"""
data_parts = _extract_column_components(reference)
best_match = None
best_score = 0
for candidate in candidates:
parts = _extract_column_components(candidate)
matching_parts = sum(
1 for part in data_parts if part in parts and len(part) > 2
)
max_parts = max(len(data_parts), len(parts))
part_score = matching_parts / max_parts if max_parts > 0 else 0
string_score = SequenceMatcher(None, reference, candidate).ratio()
# Weighted combination
final_score = 0.6 * part_score + 0.4 * string_score
if final_score > best_score:
best_score = final_score
best_match = candidate
return best_match