import sys
import modifinder.utilities.general_utils as general_utils
from modifinder.utilities.mol_utils import _get_molecule
import rdkit.Chem.rdMolDescriptors as rdMolDescriptors
from rdkit import Chem
import numpy as np
import math
import uuid
from typing import List, Tuple
# import modifinder as mf
from modifinder.classes.Spectrum import Spectrum
from modifinder.classes.StructureMeta import StructureMeta
from .. import convert
# TODO: Move to Spectrum.py
def _filter_peaks_by_precursor_mz(peaks:List[Tuple[float, float]], precursor_mz:float)->List[Tuple[float, float]]:
"""Removes peaks that are higher than 0.99 * precursor_mz so only fragments remain in the spectrum.
Parameters
----------
peaks : List[Tuple[float, float]]
A list of tuples representing the peaks in the spectrum, where each tuple contains the m/z and intensity of the peak.
precursor_mz : float
The m/z of the precursor ion, used to filter out peaks that are higher than 0.99 * precursor_mz.
Returns
-------
"""
precursor_mz = float(precursor_mz)
# Ensure each peak is a tuple and m/z is a float
filtered_peaks = []
for peak in peaks:
mz = float(peak[0])
intensity = float(peak[1])
if mz < 0.99 * precursor_mz:
filtered_peaks.append((mz, intensity))
return filtered_peaks
[docs]class Compound:
""" A class to represent a compound
The compound always has spectrum data.
If the compound structure is known, it also has a structure property.
Attributes
----------
id (str) : The id of the compound
spectrum (Spectrum) : Spectrum Tuple representing mz, intensity, Precursor mass and Charge for mass spectrumetry data
structure (Chem.Mol): The structure of the compound
is_known (bool): A boolean indicating whether the compound is known, derived from the structure
name (str): The name of the compound
accession (str): The accession of the compound
library_membership (str): The GNPS library membership of the compound
Examples
--------
Create a compound by providing the necessary information:
>>> compound = Compound(id="CCMSLIB00005435812", peaks=[[110.066925,38499.089844],[138.060638,412152.093750],[195.079575,6894530.000000],[195.200180,480874.812500],[196.082092,43027.628906]], precursor_mz=195.087, precursor_charge=1, adduct="[M+H]+", smiles="CN1C=NC2=C1C(=O)N(C(=O)N2C)C")
Alternatively, you can create a compound by providing a dictionary of data:
>>> data = {
... "id": "CCMSLIB00005435812",
... "peaks": [[110.066925,38499.089844],[138.060638,412152.093750],[195.079575,6894530.000000],[195.200180,480874.812500],[196.082092,43027.628906]],
... "precursor_mz": 195.087,
... "charge": 1,
... "adduct": "[M+H]+",
... "smiles": "CN1C=NC2=C1C(=O)N(C(=O)N2C)C"
... }
>>> compound = Compound(data)
"""
[docs] def __init__(self,
spectrum:List[Tuple[float, float]],
precursor_mz:float,
precursor_charge:int,
adduct:str,
smiles:str = None,
**kwargs):
"""Initialize the Compound class
The compound class can be initialized in three different ways:
1. By providing a dictionary of data to the *data* parameter that contains all the necessary information
2. By providing the necessary information as parameter
If both the *data* and the parameters are provided, the parameters will override the data.
If of the incoming data and kwargs are provided, an empty instance of the class will be created.
Parameters
----------
incoming_data : input data (optional, default: None)
The data to initialize the class with, can be a dictionary of data or a compound object. If not provided,
the class will be initialized with the provided keyword arguments. If provided, the keyword arguments will still
override the data.
kwargs : keyword arguments (optional, default: No attributes)
Attributes to initialize the class with, if provided, they will override the attributes from the data
See Also
--------
modifinder.convert
Spectrum
"""
lower_kwargs = {key.lower(): value for key, value in kwargs.items()}
# define the attributes of the class
self.id = None
if "id" in lower_kwargs:
self.id = lower_kwargs["id"]
else:
self.id = str(uuid.uuid4())
spectrum = _filter_peaks_by_precursor_mz(spectrum, precursor_mz)
self.spectrum = Spectrum(
mz= [s[0] for s in spectrum],
intensity= [s[1] for s in spectrum],
precursor_mz=precursor_mz,
precursor_charge=precursor_charge,
adduct=adduct,
ms_level=2,
)
self.structure = None
self.is_known = False
self._exact_mass = None
self._distances = None
if smiles:
self.structure = _get_molecule(
smiles = smiles,
)
if self.structure:
self.is_known = True
self._exact_mass = rdMolDescriptors.CalcExactMolWt(self.structure)
self._distances = Chem.rdmolops.GetDistanceMatrix(self.structure)
self.name = None
if self.name is None:
self.name = lower_kwargs.get('compound_name', None)
self.additional_attributes = {}
# Add kwargs to additional attributes
for key, value in lower_kwargs.items():
if key not in ['id', 'compound_name']:
self.additional_attributes[key] = value
@property
def spectrum(self):
return self._spectrum
@spectrum.setter
def spectrum(self, value):
self._spectrum = value
@property
def distances(self) -> dict:
"""Get the distances between every pair of atoms in the compound
Returns
-------
dict: A dictionary of distances between every pair of atoms in the compound
"""
return self._distances
@property
def exact_mass(self) -> float:
"""Get the exact mass of the compound
Returns
-------
float: The exact mass of the compound
"""
return self._exact_mass
def __getattr__(self, name):
if 'additional_attributes' in self.__dict__:
if name in self.__dict__['additional_attributes']:
return self.__dict__['additional_attributes'][name]
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
[docs] def clear(self):
"""Clear the compound data and reset all the attributes to None"""
self.structure = None
self.id = None
self.spectrum = None
self.is_known = None
self.name = None
self._distances = None
self.additional_attributes = {}
self._exact_mass = None
def __str__(self):
valuable_data_keys = ['id', 'name']
strings = [f"{key}: {self.__dict__[key]}" for key in valuable_data_keys if self.__dict__[key] is not None]
joined = ', '.join(strings)
result = f"Compound({joined}) with {len(self.spectrum.mz_key)} peaks"
if self.structure is not None:
result += f" and structure {Chem.MolToSmiles(self.structure)}"
return result
[docs] def copy(self):
"""Return a copy of the compound"""
copied_compound = Compound()
convert.to_compound(self, use_object = copied_compound)
return copied_compound
def print_peaks_to_fragments_map_by_id(self, peaks: list = None):
"""Print the peaks to fragments map
Parameters
----------
peaks : list, optional (default: None)
The list of peaks to print the fragments for, if None, print all peaks
"""
if peaks is None:
peaks = range(len(self.spectrum.mz_key))
for peak in peaks:
print(f"Peak {peak}: {self.spectrum.mz_key[peak] / 1e6}, Fragments: {self.spectrum.peak_fragment_dict[peak]}")
print()
def find_existance_by_peak_mzs(self, peak_mzs: list):
"""
For each atom, and for each peak in the list, find the fragments that the atom is part of
Parameters
----------
peak_mzs : list of peak mzs
Returns
-------
existance : list of dicts for each atom, each dict contains the peak ids as keys and the fragments as values
"""
existance = [dict() for i in range(len(self.structure.GetAtoms()))]
for peak in peak_mzs:
for fragment in self.spectrum.peak_fragment_dict.get(peak, []):
# get all the bits that are 1 in the fragment
bin_fragment = bin(fragment)
len_fragment = len(bin_fragment)
hitAtoms = [len_fragment-i-1 for i in range(len(bin_fragment)) if bin_fragment[i] == '1']
for atom in hitAtoms:
if peak not in existance[atom]:
existance[atom][peak] = []
existance[atom][peak].append(fragment)
return existance
def calculate_contribution_atom_by_peak_id(self, atom: int, peakid:int, existance_data:list, CI:bool = False, CPA:bool = True, CFA:bool = True):
"""Calculates the contribution of an atom to a peak
Parameters
----------
atom : int
The index of the atom
peakid : int
The index of the peak
existance_data : list of dicts
The existance data for the atoms
CI : bool, optional (default: False)
Calculate the intensity factor
CPA : bool, optional (default: True)
Calculate the atom peak ambiguity factor
CFA : bool, optional (default: True)
Calculate the fragment ambiguity factor
"""
contribution = 0
if peakid not in existance_data[atom]:
return contribution
intensity_factor = 1
atom_peak_ambiguity_factor = 1
fragment_ambiguity_factor = 1
if CI:
# TODO This is very unlikely to work post-refactor
raise NotImplementedError("CI (Consider Intensity) is not implemented yet, as the refactor changed how the spectrum data is stored. This will be implemented in a future update.")
intensity_factor = self.spectrum.intensity[peakid]
if CPA:
atom_peak_ambiguity_factor = 1/len(self.spectrum.peak_fragment_dict[int(peakid)])
for frag in existance_data[atom][peakid]:
if CFA:
fragment_ambiguity_factor = 1/(bin(frag).count("1"))
contribution += intensity_factor * atom_peak_ambiguity_factor * fragment_ambiguity_factor
return contribution
def calculate_contributions_by_peak_id(self, peak_mzs, CI = False, CPA = True, CFA = True, CPE = True):
"""
input:
peak_mzs: list of peak ids
CI: (Consider_Intensity) bool, if True, the intensity of the peaks is considered (default: False)
CPA: (Consider_Peak_Ambiguity) bool, if True, the peak ambiguity (number of fragments assigned to a peak) is considered (default: True)
CFA: (Consider_Fragment_Ambiguity) bool, if True, the fragment ambiguity (number of atoms in fragment) is considered (default: True)
CPE: (Consider_Peak_Entropy) bool, if True, the peak entropy (how ambiguis the fragments are) is considered (default: True)
"""
num_atoms = len(self.structure.GetAtoms())
existance_data = self.find_existance_by_peak_mzs(peak_mzs)
contributions = [0 for i in range(num_atoms)]
peak_atom_contributions = np.zeros((len(peak_mzs), num_atoms))
for i, peak in enumerate(peak_mzs):
for atom in range(num_atoms):
peak_atom_contributions[i][atom] = self.calculate_contribution_atom_by_peak_id(atom, peak, existance_data, CI=CI, CPA=CPA, CFA=CFA)
if CPE:
peak_entropies = np.zeros(len(peak_mzs))
for i in range(len(peak_mzs)):
peak_entropies[i] = 1 - general_utils.entropy(peak_atom_contributions[i])
# peak_entropies = peak_entropies / np.max(peak_entropies)
else:
peak_entropies = np.ones(len(peak_mzs))
for i in range(num_atoms):
for j in range(len(peak_mzs)):
contributions[i] += peak_atom_contributions[j][i] * peak_entropies[j]
return contributions
def filter_fragments_by_atoms_and_peak_ids(self, atoms: list, peaks: list = None):
"""Filter the fragments by the atoms, remove fragments that do not contain at least one of the atoms
Parameters
----------
atoms: a list of atoms to filter the fragments
peaks: a list of peaks to filter their fragments, if None, use all peaks
Returns
-------
updated: the number of updated peaks
"""
if peaks is None:
peaks = [i for i in range(len(self.peaks))]
updated = 0
for i in peaks:
updated_fragments = set()
for fragment in self.spectrum.peak_fragment_dict[int(i)]:
for atom in atoms:
if 1 << atom & fragment:
updated_fragments.add(fragment)
break
if len(updated_fragments) != len(self.spectrum.peak_fragment_dict[int(i)]):
updated += 1
self.spectrum.peak_fragment_dict[int(i)] = updated_fragments
return updated
from typing import Tuple
def calculate_peak_annotation_ambiguity_by_peak_ids(self, peaks: list=None) -> Tuple[float, float]:
"""Calculate the peak annotation ambiguity
Parameters
----------
peaks : list
a list of peaks to calculate the ambiguity for, if None, use all peaks
Returns
-------
(float, float) : a tuple of two values (ambiguity, ratio)
ambiguity: the average number of fragments per annotated peaks
ratio: the ratio of annotated peaks to all peaks
"""
if peaks is None:
peaks = [i for i in range(len(self.peaks))]
ambiguity = 0
annotated_peaks = 0
for peak in peaks:
if len(self.spectrum.peak_fragment_dict[int(peak)]) > 0:
annotated_peaks += 1
ambiguity += len(self.spectrum.peak_fragment_dict[int(peak)])
if annotated_peaks == 0:
return -1, 0
if len(peaks) == 0:
return -1, -1
return ambiguity / annotated_peaks, annotated_peaks / len(peaks)
def calculate_annotation_entropy_by_peak_ids(self, peaks: list=None):
"""Calculate the entropy of the annotation
Parameters
----------
peaks : list
a list of peaks to calculate the entropy for, if None, use all peaks
Returns
-------
float : the entropy of the annotation
"""
if peaks is None:
peaks = [i for i in range(len(self.peaks))]
peak_entropies = [0 for i in range(len(self.peaks))]
n = len(self.structure.GetAtoms())
for peak in peaks:
atoms_appearance = [0 for i in range(n)]
for fragment in self.spectrum.peak_fragment_dict[int(peak)]:
for atom in range(n):
if 1 << atom & fragment:
atoms_appearance[atom] += 1
entropy = 0
for atom in range(n):
if atoms_appearance[atom] > 0:
p = atoms_appearance[atom] / len(self.spectrum.peak_fragment_dict[int(peak)])
entropy -= p * math.log(p)
peak_entropies[peak] = entropy
if len(peak_entropies) == 0:
return -1
entropy = sum(peak_entropies) / len(peak_entropies)
return entropy