Source code for modifinder.classes.Compound



import sys

import modifinder.utilities.general_utils as general_utils
from modifinder.utilities.mol_utils import _get_molecule
import rdkit.Chem.rdMolDescriptors as rdMolDescriptors
from rdkit import Chem
import numpy as np
import math
import uuid
from typing import List, Tuple

# import modifinder as mf
from modifinder.classes.Spectrum import Spectrum
from modifinder.classes.StructureMeta import StructureMeta
from .. import convert

# TODO: Move to Spectrum.py
def _filter_peaks_by_precursor_mz(peaks:List[Tuple[float, float]], precursor_mz:float)->List[Tuple[float, float]]:
    """Removes peaks that are higher than 0.99 * precursor_mz so only fragments remain in the spectrum.

    Parameters
    ----------
    peaks : List[Tuple[float, float]]
        A list of tuples representing the peaks in the spectrum, where each tuple contains the m/z and intensity of the peak.
    precursor_mz : float
        The m/z of the precursor ion, used to filter out peaks that are higher than 0.99 * precursor_mz.
    
    Returns
    -------
    """
    precursor_mz = float(precursor_mz)

    # Ensure each peak is a tuple and m/z is a float
    filtered_peaks = []
    for peak in peaks:
        mz = float(peak[0])
        intensity = float(peak[1])
        if mz < 0.99 * precursor_mz:
            filtered_peaks.append((mz, intensity))
    return filtered_peaks


[docs]class Compound: """ A class to represent a compound The compound always has spectrum data. If the compound structure is known, it also has a structure property. Attributes ---------- id (str) : The id of the compound spectrum (Spectrum) : Spectrum Tuple representing mz, intensity, Precursor mass and Charge for mass spectrumetry data structure (Chem.Mol): The structure of the compound is_known (bool): A boolean indicating whether the compound is known, derived from the structure name (str): The name of the compound accession (str): The accession of the compound library_membership (str): The GNPS library membership of the compound Examples -------- Create a compound by providing the necessary information: >>> compound = Compound(id="CCMSLIB00005435812", peaks=[[110.066925,38499.089844],[138.060638,412152.093750],[195.079575,6894530.000000],[195.200180,480874.812500],[196.082092,43027.628906]], precursor_mz=195.087, precursor_charge=1, adduct="[M+H]+", smiles="CN1C=NC2=C1C(=O)N(C(=O)N2C)C") Alternatively, you can create a compound by providing a dictionary of data: >>> data = { ... "id": "CCMSLIB00005435812", ... "peaks": [[110.066925,38499.089844],[138.060638,412152.093750],[195.079575,6894530.000000],[195.200180,480874.812500],[196.082092,43027.628906]], ... "precursor_mz": 195.087, ... "charge": 1, ... "adduct": "[M+H]+", ... "smiles": "CN1C=NC2=C1C(=O)N(C(=O)N2C)C" ... } >>> compound = Compound(data) """
[docs] def __init__(self, spectrum:List[Tuple[float, float]], precursor_mz:float, precursor_charge:int, adduct:str, smiles:str = None, **kwargs): """Initialize the Compound class The compound class can be initialized in three different ways: 1. By providing a dictionary of data to the *data* parameter that contains all the necessary information 2. By providing the necessary information as parameter If both the *data* and the parameters are provided, the parameters will override the data. If of the incoming data and kwargs are provided, an empty instance of the class will be created. Parameters ---------- incoming_data : input data (optional, default: None) The data to initialize the class with, can be a dictionary of data or a compound object. If not provided, the class will be initialized with the provided keyword arguments. If provided, the keyword arguments will still override the data. kwargs : keyword arguments (optional, default: No attributes) Attributes to initialize the class with, if provided, they will override the attributes from the data See Also -------- modifinder.convert Spectrum """ lower_kwargs = {key.lower(): value for key, value in kwargs.items()} # define the attributes of the class self.id = None if "id" in lower_kwargs: self.id = lower_kwargs["id"] else: self.id = str(uuid.uuid4()) spectrum = _filter_peaks_by_precursor_mz(spectrum, precursor_mz) self.spectrum = Spectrum( mz= [s[0] for s in spectrum], intensity= [s[1] for s in spectrum], precursor_mz=precursor_mz, precursor_charge=precursor_charge, adduct=adduct, ms_level=2, ) self.structure = None self.is_known = False self._exact_mass = None self._distances = None if smiles: self.structure = _get_molecule( smiles = smiles, ) if self.structure: self.is_known = True self._exact_mass = rdMolDescriptors.CalcExactMolWt(self.structure) self._distances = Chem.rdmolops.GetDistanceMatrix(self.structure) self.name = None if self.name is None: self.name = lower_kwargs.get('compound_name', None) self.additional_attributes = {} # Add kwargs to additional attributes for key, value in lower_kwargs.items(): if key not in ['id', 'compound_name']: self.additional_attributes[key] = value
@property def spectrum(self): return self._spectrum @spectrum.setter def spectrum(self, value): self._spectrum = value @property def distances(self) -> dict: """Get the distances between every pair of atoms in the compound Returns ------- dict: A dictionary of distances between every pair of atoms in the compound """ return self._distances @property def exact_mass(self) -> float: """Get the exact mass of the compound Returns ------- float: The exact mass of the compound """ return self._exact_mass def __getattr__(self, name): if 'additional_attributes' in self.__dict__: if name in self.__dict__['additional_attributes']: return self.__dict__['additional_attributes'][name] raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
[docs] def clear(self): """Clear the compound data and reset all the attributes to None""" self.structure = None self.id = None self.spectrum = None self.is_known = None self.name = None self._distances = None self.additional_attributes = {} self._exact_mass = None
def __str__(self): valuable_data_keys = ['id', 'name'] strings = [f"{key}: {self.__dict__[key]}" for key in valuable_data_keys if self.__dict__[key] is not None] joined = ', '.join(strings) result = f"Compound({joined}) with {len(self.spectrum.mz_key)} peaks" if self.structure is not None: result += f" and structure {Chem.MolToSmiles(self.structure)}" return result
[docs] def copy(self): """Return a copy of the compound""" copied_compound = Compound() convert.to_compound(self, use_object = copied_compound) return copied_compound
[docs] def get_meta_data(self): """ Get the meta data of the compound Returns: dict: A dictionary containing the meta data of the compound """ description = { "num_peaks": len(self.spectrum.mz_key), "adduct": self.spectrum.adduct, "precursor_mz": self.spectrum.precursor_mz, "charge": self.spectrum.precursor_charge, } if self.name is not None: description["name"] = self.name if self.is_known: st_meta = StructureMeta.from_structure(self.structure) description.update(st_meta.__dict__) return description
def print_peaks_to_fragments_map_by_id(self, peaks: list = None): """Print the peaks to fragments map Parameters ---------- peaks : list, optional (default: None) The list of peaks to print the fragments for, if None, print all peaks """ if peaks is None: peaks = range(len(self.spectrum.mz_key)) for peak in peaks: print(f"Peak {peak}: {self.spectrum.mz_key[peak] / 1e6}, Fragments: {self.spectrum.peak_fragment_dict[peak]}") print() def find_existance_by_peak_mzs(self, peak_mzs: list): """ For each atom, and for each peak in the list, find the fragments that the atom is part of Parameters ---------- peak_mzs : list of peak mzs Returns ------- existance : list of dicts for each atom, each dict contains the peak ids as keys and the fragments as values """ existance = [dict() for i in range(len(self.structure.GetAtoms()))] for peak in peak_mzs: for fragment in self.spectrum.peak_fragment_dict.get(peak, []): # get all the bits that are 1 in the fragment bin_fragment = bin(fragment) len_fragment = len(bin_fragment) hitAtoms = [len_fragment-i-1 for i in range(len(bin_fragment)) if bin_fragment[i] == '1'] for atom in hitAtoms: if peak not in existance[atom]: existance[atom][peak] = [] existance[atom][peak].append(fragment) return existance def calculate_contribution_atom_by_peak_id(self, atom: int, peakid:int, existance_data:list, CI:bool = False, CPA:bool = True, CFA:bool = True): """Calculates the contribution of an atom to a peak Parameters ---------- atom : int The index of the atom peakid : int The index of the peak existance_data : list of dicts The existance data for the atoms CI : bool, optional (default: False) Calculate the intensity factor CPA : bool, optional (default: True) Calculate the atom peak ambiguity factor CFA : bool, optional (default: True) Calculate the fragment ambiguity factor """ contribution = 0 if peakid not in existance_data[atom]: return contribution intensity_factor = 1 atom_peak_ambiguity_factor = 1 fragment_ambiguity_factor = 1 if CI: # TODO This is very unlikely to work post-refactor raise NotImplementedError("CI (Consider Intensity) is not implemented yet, as the refactor changed how the spectrum data is stored. This will be implemented in a future update.") intensity_factor = self.spectrum.intensity[peakid] if CPA: atom_peak_ambiguity_factor = 1/len(self.spectrum.peak_fragment_dict[int(peakid)]) for frag in existance_data[atom][peakid]: if CFA: fragment_ambiguity_factor = 1/(bin(frag).count("1")) contribution += intensity_factor * atom_peak_ambiguity_factor * fragment_ambiguity_factor return contribution def calculate_contributions_by_peak_id(self, peak_mzs, CI = False, CPA = True, CFA = True, CPE = True): """ input: peak_mzs: list of peak ids CI: (Consider_Intensity) bool, if True, the intensity of the peaks is considered (default: False) CPA: (Consider_Peak_Ambiguity) bool, if True, the peak ambiguity (number of fragments assigned to a peak) is considered (default: True) CFA: (Consider_Fragment_Ambiguity) bool, if True, the fragment ambiguity (number of atoms in fragment) is considered (default: True) CPE: (Consider_Peak_Entropy) bool, if True, the peak entropy (how ambiguis the fragments are) is considered (default: True) """ num_atoms = len(self.structure.GetAtoms()) existance_data = self.find_existance_by_peak_mzs(peak_mzs) contributions = [0 for i in range(num_atoms)] peak_atom_contributions = np.zeros((len(peak_mzs), num_atoms)) for i, peak in enumerate(peak_mzs): for atom in range(num_atoms): peak_atom_contributions[i][atom] = self.calculate_contribution_atom_by_peak_id(atom, peak, existance_data, CI=CI, CPA=CPA, CFA=CFA) if CPE: peak_entropies = np.zeros(len(peak_mzs)) for i in range(len(peak_mzs)): peak_entropies[i] = 1 - general_utils.entropy(peak_atom_contributions[i]) # peak_entropies = peak_entropies / np.max(peak_entropies) else: peak_entropies = np.ones(len(peak_mzs)) for i in range(num_atoms): for j in range(len(peak_mzs)): contributions[i] += peak_atom_contributions[j][i] * peak_entropies[j] return contributions def filter_fragments_by_atoms_and_peak_ids(self, atoms: list, peaks: list = None): """Filter the fragments by the atoms, remove fragments that do not contain at least one of the atoms Parameters ---------- atoms: a list of atoms to filter the fragments peaks: a list of peaks to filter their fragments, if None, use all peaks Returns ------- updated: the number of updated peaks """ if peaks is None: peaks = [i for i in range(len(self.peaks))] updated = 0 for i in peaks: updated_fragments = set() for fragment in self.spectrum.peak_fragment_dict[int(i)]: for atom in atoms: if 1 << atom & fragment: updated_fragments.add(fragment) break if len(updated_fragments) != len(self.spectrum.peak_fragment_dict[int(i)]): updated += 1 self.spectrum.peak_fragment_dict[int(i)] = updated_fragments return updated from typing import Tuple def calculate_peak_annotation_ambiguity_by_peak_ids(self, peaks: list=None) -> Tuple[float, float]: """Calculate the peak annotation ambiguity Parameters ---------- peaks : list a list of peaks to calculate the ambiguity for, if None, use all peaks Returns ------- (float, float) : a tuple of two values (ambiguity, ratio) ambiguity: the average number of fragments per annotated peaks ratio: the ratio of annotated peaks to all peaks """ if peaks is None: peaks = [i for i in range(len(self.peaks))] ambiguity = 0 annotated_peaks = 0 for peak in peaks: if len(self.spectrum.peak_fragment_dict[int(peak)]) > 0: annotated_peaks += 1 ambiguity += len(self.spectrum.peak_fragment_dict[int(peak)]) if annotated_peaks == 0: return -1, 0 if len(peaks) == 0: return -1, -1 return ambiguity / annotated_peaks, annotated_peaks / len(peaks) def calculate_annotation_entropy_by_peak_ids(self, peaks: list=None): """Calculate the entropy of the annotation Parameters ---------- peaks : list a list of peaks to calculate the entropy for, if None, use all peaks Returns ------- float : the entropy of the annotation """ if peaks is None: peaks = [i for i in range(len(self.peaks))] peak_entropies = [0 for i in range(len(self.peaks))] n = len(self.structure.GetAtoms()) for peak in peaks: atoms_appearance = [0 for i in range(n)] for fragment in self.spectrum.peak_fragment_dict[int(peak)]: for atom in range(n): if 1 << atom & fragment: atoms_appearance[atom] += 1 entropy = 0 for atom in range(n): if atoms_appearance[atom] > 0: p = atoms_appearance[atom] / len(self.spectrum.peak_fragment_dict[int(peak)]) entropy -= p * math.log(p) peak_entropies[peak] = entropy if len(peak_entropies) == 0: return -1 entropy = sum(peak_entropies) / len(peak_entropies) return entropy