Source code for modifinder.utilities.spectrum_utils

from modifinder.utilities import general_utils as mf_gu
from modifinder import Spectrum
import tqdm

def _cluster_spectrums(spectrums, ppm_tolerance = 10, mz_tolerance = 0.1, verbose = False):
    '''
    This function is used to cluster the peaks of the spectrums within a given tolerance
    
    Parameters
    ----------
        spectrums: list of Spectrum objects
            The spectrums to be clustered
        ppm_tolerance: float
            The tolerance in ppm
        mz_tolerance: float
            The tolerance in mz
    
    
    Returns:
    clusters: list of clusters of the mz values, each cluster is a list of [mz, intensity, origin_spectrum_index]
    '''
    
    all_points = []
    for index, spectrum in tqdm.tqdm(enumerate(spectrums), disable = not verbose):
        for mz, intensity in zip(spectrum.mz, spectrum.intensity):
            all_points.append((mz, intensity, index))
    
    # print("all points", len(all_points))

    # 1. Sort by mz
    all_points.sort(key=lambda x: x[0])

    # 2. Sweep
    clusters = []
    if not all_points:
        return clusters

    # Start the first cluster
    first_mz, first_intensity, first_idx = all_points[0]
    clusters.append([[first_mz, first_intensity, first_idx]])
    ref_mz = first_mz

    for (mz, intensity, idx) in all_points[1:]:
        if not mf_gu.is_shifted(mz, ref_mz, ppm_tolerance, mz_tolerance):
            # Add to the last cluster
            clusters[-1].append([mz, intensity, idx])
        else:
            # Create a new cluster
            clusters.append([[mz, intensity, idx]])
            ref_mz = mz

    return clusters


[docs]def aggregate_spectrums(spectrums, ppm_tolerance = 10, mz_tolerance = 0.1, consensus_majority_ratio = 0, **kwargs): ''' This function is to create a consensus spectrum from a list of spectrums of the same compound all the other components of the generated spectrum such as the charge, ... are taken from the first spectrum. if the spectrums have different adducts, the adduct mass is removed from the spectrums before aggregation. The final result is reported with no adduct mass. Parameters ---------- spectrums: list of Spectrum objects The spectrums to be aggregated ppm_tolerance: float The tolerance in ppm mz_tolerance: float The tolerance in mz consensus_majority_ratio: float The ratio of the spectrums that should have the peak to be considered in the consensus if is 1, all the spectrums should have the peak to be considered in the consensus if is 0, only one spectrum should have the peak to be considered in the consensus Returns: result: Spectrum object The aggregated spectrum ''' adducts = set([s.adduct for s in spectrums]) spectrums = [remove_adduct(s) for s in spectrums] clusters = _cluster_spectrums(spectrums, ppm_tolerance, mz_tolerance) # print("clustering done", len(clusters)) aggregate_mz = [] aggregate_intensity = [] for cluster in clusters: sum_intensity = 0 sum_mz = 0 consensus = set() for mz, intensity, index in cluster: sum_intensity += intensity sum_mz += mz consensus.add(index) if len(consensus) < consensus_majority_ratio * len(spectrums): continue aggregate_mz.append(sum_mz/len(cluster)) aggregate_intensity.append(sum_intensity/len(cluster)) result = Spectrum(spectrums[0].copy(), mz = aggregate_mz, intensity = aggregate_intensity, **kwargs) if len(adducts) == 1: result = add_adduct(result, adducts.pop()) return result
[docs]def refine_consensus(spectrum, spectrums, ppm_tolerance = 10, mz_tolerance = 0.1, consensus_majority_ratio = 0): ''' This function is used to refine a spectrum based on the consensus of the other spectrums any peak that is not in the consensus of the other spectrums is removed. If the spectrums are with different adducts, the spectrums are adjusted to remove the adduct mass. The final spectrum is adjusted to have the correct adduct mass. Parameters ---------- spectrum: Spectrum object The spectrum to be refined spectrums: list of Spectrum objects The spectrums to be used to refine the spectrum ppm_tolerance: float The tolerance in ppm mz_tolerance: float The tolerance in mz consensus_majority_ratio: float (0-1) [default: 0] The ratio of the spectrums that should have the peak in order to keep it, 0 means none of the spectrums need to have it 1 means all of the spectrums need to have it. Returns: A tuple of two elements: result: Spectrum object The refined spectrum appearances: tuple of lists The first list contains the number of spectrums that have the peak after refining The second list contains the number of spectrums that have the peak before refining ''' # Create an empty list to store the refined consensus refined_mz = [] refined_intensity = [] new_appearances = [] old_appearances = [] adduct = spectrum.adduct spectrum = remove_adduct(spectrum) spectrums = [remove_adduct(s) for s in spectrums] for mz, intensity in zip(spectrum.mz, spectrum.intensity): found_count = 0 for other_spectrum in spectrums: peak_indexes = other_spectrum.get_peak_indexes(mz, ppm_tolerance = ppm_tolerance, mz_tolerance = mz_tolerance) if len(peak_indexes) > 0: found_count += 1 old_appearances.append(found_count) if found_count >= consensus_majority_ratio * len(spectrums): refined_mz.append(mz) refined_intensity.append(intensity) new_appearances.append(found_count) result = Spectrum(spectrum.copy(), mz = refined_mz, intensity = refined_intensity) result = add_adduct(result, adduct) return result, (new_appearances, old_appearances)
[docs]def remove_adduct(spectrum): ''' This function is used to remove the adduct mass from the spectrum Parameters ---------- spectrum: Spectrum object The spectrum to remove the adduct mass from its mz values Returns: result: Spectrum object The spectrum without the adduct mass ''' result = spectrum.copy() adduct_mass = spectrum.adduct_mass result.mz = [mz - adduct_mass for mz in result.mz] if result.precursor_mz is not None: result.precursor_mz -= adduct_mass result.adduct = None return result
[docs]def add_adduct(spectrum, adduct): ''' This function is used to add the adduct mass to the spectrum Parameters ---------- spectrum: Spectrum object The spectrum to add the adduct mass to adduct: str The adduct to add to the spectrum, should be in the supported adducts list. Returns: result: Spectrum object The spectrum with the adduct mass ''' result = spectrum.copy() adduct_mass = mf_gu.get_adduct_mass(adduct) result.mz = [mz + adduct_mass for mz in result.mz] if result.precursor_mz is not None: result.precursor_mz += adduct_mass result.adduct = adduct return result