Source code for modifinder.engines.alignment.CosineAlignmentEngine

"""This module contains the CosineAlignmentEngine class for aligning two spectra using cosine similarity."""

import numpy as np
from typing import List, Tuple
from modifinder.engines.Abtracts import AlignmentEngine
from modifinder.classes.Spectrum import Spectrum
from modifinder.classes.EdgeDetail import EdgeDetail, Match, MatchType
from modifinder.utilities.general_utils import is_shifted
from modifinder.exceptions import ModiFinderError
import networkx as nx


def _cosine_fast(
    spec: Spectrum,
    spec_other: Spectrum,
    mz_tolerance: float = None,
    ppm_tolerance: float = None,
    allow_shift: bool = True,
) -> Tuple[float, List[Tuple[int, int]]]:
    """Approximates alignment with the highest cosine score between two spectra.
    
    if both tolerances are provided, both criteria are used to filter the matches. If only one is provided, only that one is used.

    Args:
        spec (Spectrum): First spectrum
        spec_other (Spectrum): Second spectrum
        mz_tolerance (float): Tolerance in Da for fragment m/z values, if None, it is not used.
        ppm_tolerance (float): Tolerance in ppm for fragment m/z values, if None, it is not used.
        allow_shift (bool): _description_

    Returns:
        Tuple[float, List[Tuple[int, int]]]: _description_
    """
    
    if mz_tolerance is None and ppm_tolerance is None:
        raise ModiFinderError("At least one of mz_tolerance or ppm_tolerance must be provided.")
    
    precursor_charge = max(spec.precursor_charge, 1)
    precursor_mass_diff = (
        spec.precursor_mz - spec_other.precursor_mz
    ) * precursor_charge
    # Only take peak shifts into account if the mass difference is relevant.
    num_shifts = 1
    if allow_shift and abs(precursor_mass_diff) >= mz_tolerance:
        num_shifts += precursor_charge
    other_peak_index = np.zeros(num_shifts, np.uint16)
    mass_diff = np.zeros(num_shifts, np.float32)
    for charge in range(1, num_shifts):
        mass_diff[charge] = precursor_mass_diff / charge

    # Find the matching peaks between both spectra.
    peak_match_scores, peak_match_idx = [], []
    for peak_index, (peak_mz, peak_intensity) in enumerate(
        zip(spec.mz, spec.intensity)
    ):
        # Advance while there is an excessive mass difference.
        for cpi in range(num_shifts):
            while other_peak_index[cpi] < len(spec_other.mz) - 1 and (
                peak_mz - mz_tolerance
                > spec_other.mz[other_peak_index[cpi]] + mass_diff[cpi]
            ):
                other_peak_index[cpi] += 1

        # Match the peaks within the fragment mass window if possible.
        for cpi in range(num_shifts):
            index = 0
            other_peak_i = other_peak_index[cpi] + index
            while (
                other_peak_i < len(spec_other.mz)
                and abs(peak_mz - (spec_other.mz[other_peak_i] + mass_diff[cpi]))
                <= mz_tolerance
            ):
                if abs(peak_mz - (spec_other.mz[other_peak_i] + mass_diff[cpi])) <= (
                    ppm_tolerance * peak_mz / 1e6
                ):
                    peak_match_scores.append(
                        peak_intensity * spec_other.intensity[other_peak_i]
                    )
                    peak_match_idx.append((peak_index, other_peak_i))
                index += 1
                other_peak_i = other_peak_index[cpi] + index

    score, peak_matches = 0.0, []
    if len(peak_match_scores) > 0:
        # Use the most prominent peak matches to compute the score (sort in
        # descending order).
        peak_match_scores_arr = np.asarray(peak_match_scores)
        peak_match_order = np.argsort(peak_match_scores_arr)[::-1]
        peak_match_scores_arr = peak_match_scores_arr[peak_match_order]
        peak_match_idx_arr = np.asarray(peak_match_idx)[peak_match_order]
        peaks_used, other_peaks_used = set(), set()
        for peak_match_score, peak_i, other_peak_i in zip(
            peak_match_scores_arr,
            peak_match_idx_arr[:, 0],
            peak_match_idx_arr[:, 1],
        ):
            if peak_i not in peaks_used and other_peak_i not in other_peaks_used:
                score += peak_match_score
                # Save the matched peaks.
                peak_matches.append((peak_i, other_peak_i))
                # Make sure these peaks are not used anymore.
                peaks_used.add(peak_i)
                other_peaks_used.add(other_peak_i)

    return score, peak_matches


[docs]class CosineAlignmentEngine(AlignmentEngine): def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] def align( self, network: nx.DiGraph, mz_tolerance: float = 0.02, ppm_tolerance: float = 100.0, align_all: bool = False, **kwargs, ): """ Aligns the spectra in the network using cosine similarity. Parameters ---------- network : nx.DiGraph The Compound Graph object to align the spectra in. mz_tolerance : float, optional Fragment mz tolerance, by default 0.02 ppm_tolerance : float, optional Fragment ppm tolerance, by default 100.0 align_all : bool, default False if True, all edges will be aligned, if False, only the edges that have not been aligned will be aligned kwargs : dict additional arguments """ edges = network.edges(data=True) for edge in edges: start_compound = network.nodes[edge[0]]["compound"] end_compound = network.nodes[edge[1]]["compound"] if "edgedetail" not in edge[2] or edge[2]["edgedetail"] is None or align_all: edge[2]["edgedetail"] = self.align_single( start_compound.spectrum, end_compound.spectrum, mz_tolerance, ppm_tolerance, **kwargs, )
[docs] def align_single( self, SpectrumTuple1: Spectrum, SpectrumTuple2: Spectrum, mz_tolerance: float = 0.02, ppm_tolerance: float = 100.0, **kwargs, ) -> EdgeDetail: """ Aligns two spectra using cosine similarity and returns the cosine score and the matched peaks. Parameters: SpectrumTuple1 (SpectrumTuple): First spectrum SpectrumTuple2 (SpectrumTuple): Second spectrum mz_tolerance (float): Fragment mz tolerance ppm_tolerance (float): Fragment ppm tolerance kwargs: additional arguments Returns: EdgeDetail: the edge detail object """ cosine, matched_peaks = _cosine_fast( SpectrumTuple1, SpectrumTuple2, mz_tolerance, ppm_tolerance, True, ) Matches = [] for match in matched_peaks: if is_shifted( SpectrumTuple1.mz[match[0]], SpectrumTuple2.mz[match[1]], ppm_tolerance, mz_tolerance, ): Matches.append(Match(match[0], match[1], MatchType.shifted)) else: Matches.append(Match(match[0], match[1], MatchType.unshifted)) return EdgeDetail( match_score=cosine, matches=Matches, number_of_modifications=-1 )