Source code for pgfinder.matching

"""Matching functions"""

import logging
import re
from decimal import Decimal

import pandas as pd
from pandas.api.types import is_numeric_dtype

from pgfinder import COLUMNS, MASS_TO_CLEAN, MOD_TYPE, MULTIMERS
from pgfinder.errors import UserError
from pgfinder.logs.logs import LOGGER_NAME

LOGGER = logging.getLogger(LOGGER_NAME)

# Only need pgfinder columns so subset those here to simplify subsequent indexing
COLUMNS = COLUMNS["pgfinder"]


[docs] def calc_ppm_tolerance(mw: float, ppm_tol: int = 10) -> float: """ Calculates ppm tolerance value Parameters ---------- mw : float Molecular weight. ppm_tol : int PPM tolerance Returns ------- float ? """ return (mw * ppm_tol) / 1000000
[docs] def filtered_theo(ftrs_df: pd.DataFrame, theo_df: pd.DataFrame, user_ppm: int) -> pd.DataFrame: """ Generate list of observed structures from theoretical masses dataframe to reduce search space. Parameters ---------- ftrs_df : pd.DataFrame Features dataframe. theo_df : pd.DataFrame Theoretical dataframe. user_ppm : int User specified Parts Per Million. Returns ------- pd.DataFrame Dataframe filtered on matches with theoretical masses. """ # Match theoretical structures to raw data to generate a list of observed structures matched_df = matching(ftrs_df=ftrs_df, matching_df=theo_df, set_ppm=user_ppm) # Create dataframe containing only theo_mwMonoisotopic & inferredStructure columns from matched_df filtered_df = matched_df[["Inferred structure", "Theo (Da)"]].copy() # Drop all rows with NaN values in the Theo (Da) column filtered_df.dropna(subset=["Theo (Da)"], inplace=True) # Drop duplicate structures and masses filtered_df.drop_duplicates(inplace=True) if filtered_df.empty: raise UserError("No matches were found for this search. Please check your database or increase mass tolerance.") return filtered_df
[docs] def multimer_builder(theo_df: pd.DataFrame, multimer_type: str, columns: dict = COLUMNS) -> pd.DataFrame: """ Generate multimers (dimers & trimers) from observed monomers. Parameters ---------- theo_df : pd.DataFrame Dataframe containing theoretical monomerics structures and their corresponding masses. multimer_type : str Type of multimers to build. columns : dict Dictionary of pgfinder columns, loaded by default from 'pgfinder/config/columns.yaml'. Returns ------- pd.DataFrame Dataframe containing theoretical multimers and their corresponding masses. """ theo_mw = [] theo_struct = [] # Perhaps need a better way inferred_structure = columns["inferred"]["structure"] # Builder sub function - calculates multimer mass and name def builder(name, mass, mult_num: int): for _, row in theo_df.iterrows(): if ( len(row[inferred_structure][: len(row[inferred_structure]) - 2]) > 2 ): # Prevent dimer creation using just gm (input format is XX|n) X = letters n = number mw = row["Theo (Da)"] acceptor = row[inferred_structure][: len(row[inferred_structure]) - 2] donor = name donor_mw = mass theo_mw.append(Decimal(mw) + donor_mw + Decimal("-18.0106")) # FIXME: In an ideal world, `-` should actually be `~` here, but Excel will throw # a hissy-fit about `~` being an escape character, so that's out of scope for now joiner = "-" if "Glycosidic" in multimer_type else "=" theo_struct.append(acceptor + joiner + donor + "|" + str(mult_num)) # Call builder subfunction with different arguements based on multimer type selected # and calculate multimers based on peptide bond through side chain multimer = MULTIMERS[multimer_type] LOGGER.info(f"Building features for multimer type : {multimer_type}") [builder(molecule, Decimal(features["mass"]), features["mult_num"]) for molecule, features in multimer.items()] # converts lists to dataframe multimer_df = pd.DataFrame(list(zip(theo_mw, theo_struct)), columns=list(columns["inferred"].values())) return multimer_df
[docs] def modification_generator(filtered_theo_df: pd.DataFrame, mod_type: str) -> pd.DataFrame: """ Generate modified muropeptides (calculates new mass and add modification tag to structure name). Parameters ---------- filtered_theo_df : pd.DataFrame Pandas DataFrame of theoretical masses that have been filtered. mod_type : str Modification type ???. Returns ------- pd.DataFrame Pandas DataFrame of ???. """ mod_mass = Decimal(MOD_TYPE[mod_type]["mass"]) # NOTE: This regex extracts the modification abbrevation from the end of its full name / type — # it simply extracts the bracketed expression at the end of the line mod_abbr = re.search(r"\(.*\)$", mod_type).group(0) obs_theo_muropeptides_df = filtered_theo_df.copy() # Calculate new mass of modified structure obs_theo_muropeptides_df["Theo (Da)"] = obs_theo_muropeptides_df["Theo (Da)"].map(lambda x: Decimal(x) + mod_mass) # Add modification tags to structure name — there are some special cases that need handling first! # FIXME: Kinda pointless to have a file that the user can use to define custom modifications if # we're going to hard-code in special cases anyways? I suppose they can still add their own as # long as they don't also want any sort of "special" formatting base_structure = obs_theo_muropeptides_df["Inferred structure"] # FIXME: Absolutely no validation that these structures make sense or are chemically possible — # even modifications like "Loss of GlcNAc" don't guarantee that a `g` character is removed from # the structure's name. It just chops off the first character with reckless abandon... # NOTE: All of these functions assume (with no guarantee) that structures begin with `gm-` special_cases = { "Extra Disaccharide (+gm)": lambda s: "gm-" + s, "Lactyl Peptides (Lac)": lambda s: "Lac" + s[2:], "Loss of Disaccharide (-gm)": lambda s: s[3:], "Loss of GlcNAc (-g)": lambda s: s[1:], } # The silly `len(s) - 2` rubbish here is to preserve the `|x` multimer number at the end of # each structure name def default_case(s): return s[: len(s) - 2] + " " + mod_abbr + s[len(s) - 2 : len(s)] structure_updater = special_cases.get(mod_type, default_case) obs_theo_muropeptides_df["Inferred structure"] = base_structure.map(structure_updater) return obs_theo_muropeptides_df
[docs] def matching(ftrs_df: pd.DataFrame, matching_df: pd.DataFrame, set_ppm: int) -> pd.DataFrame: """ Match theoretical masses to observed masses within ppm tolerance. Parameters ---------- ftrs_df : pd.DataFrame Features DataFrame matching_df : pd.DataFrame Matching DataFrame set_ppm : int Returns ------- pd.DataFrame Dataframe of matches. """ molecular_weights = matching_df[["Inferred structure", "Theo (Da)"]] matches_df = pd.DataFrame() for s, m in molecular_weights.itertuples(index=False): # FIXME: I'm not sure if it's better to convert everything to float or # to convert everthing to Decimal instead m = float(m) tolerance = calc_ppm_tolerance(m, set_ppm) mw_matches = ftrs_df[(ftrs_df["Obs (Da)"] >= m - tolerance) & (ftrs_df["Obs (Da)"] <= m + tolerance)].copy() # If we have matches add the structure and molecular weight then append if len(mw_matches.index) > 0: mw_matches["Inferred structure"] = s mw_matches["Theo (Da)"] = round(m, 4) matches_df = pd.concat([matches_df, mw_matches]) # Merge with raw data unmatched = ftrs_df[~ftrs_df.index.isin(matches_df.index)] return pd.concat([matches_df, unmatched])
[docs] def clean_up(ftrs_df: pd.DataFrame, mass_to_clean: Decimal, time_delta: float) -> pd.DataFrame: """ Clean up a DataFrame. Parameters ---------- ftrs_df : pd.DataFrame Features dataframe? mass_to_clean : Decimal Mass to be cleaned. time_delta: float Clean up window. Returns ------- pd.DataFrame: Tidied Dataframe. """ # Get the type of adduct based on the mass_to_clean (which is a float) adducts = {"sodiated": Decimal("21.9819"), "potassated": Decimal("37.9559"), "decay": Decimal("203.0793")} adducts_keys = list(adducts.keys()) adducts_values = list(adducts.values()) adduct = adducts_keys[adducts_values.index(mass_to_clean)] # Selector substrings for generating parent and adduct dataframes parent = MASS_TO_CLEAN[adduct]["parent"] target = MASS_TO_CLEAN[adduct]["target"] # Generate parent dataframe - contains parents parent_muropeptide_df = ftrs_df.loc[ftrs_df["Inferred structure"].str.contains(parent, na=False)] # Generate adduct dataframe - contains adducts adducted_muropeptide_df = ftrs_df.loc[ftrs_df["Inferred structure"].str.contains(target, na=False)] # Generate copy of rawdata dataframe consolidated_decay_df = ftrs_df.copy() # Status updates (prints to console) if parent_muropeptide_df.empty: LOGGER.info(f"No {parent} muropeptides found") if adducted_muropeptide_df.empty: LOGGER.info(f"No {target} found") elif mass_to_clean == adducts["sodiated"]: LOGGER.info(f"Processing {adducted_muropeptide_df.size} Sodium Adducts") elif mass_to_clean == adducts["potassated"]: LOGGER.info(f"Processing {adducted_muropeptide_df.size} potassium adducts") elif mass_to_clean == adducts["decay"]: LOGGER.info(f"Processing {adducted_muropeptide_df.size} in source decay products") # Consolidate adduct intensity with parent ions intensity for _y, row in parent_muropeptide_df.iterrows(): # Get retention time value from row rt = row["RT (min)"] # Get theoretical monoisotopic mass value from row as list of values intact_mw = row["Theo (Da)"] # Work out rt window upper_lim_rt = rt + time_delta lower_lim_rt = rt - time_delta # Get all adducts within rt window ins_constrained_df = adducted_muropeptide_df[ adducted_muropeptide_df["RT (min)"].between(lower_lim_rt, upper_lim_rt, inclusive="both") ] if not ins_constrained_df.empty: # Loop through each of the adducts in the RT window, the adducts # themselves all have structures containing the `target` string for _z, ins_row in ins_constrained_df.iterrows(): ins_mw = ins_row["Theo (Da)"] # Compare parent masses to adduct masses mass_delta = abs( Decimal(intact_mw).quantize(Decimal("0.00001")) - Decimal(ins_mw).quantize(Decimal("0.00001")) ) # Is the mass delta the same mass as the target `mass_to_clean`? # If so, it's the same structure but that gets its charge from # the `target` ion instead of a proton as normal. In this case, # consolidate the intensities of the parent (H+) and adduct # (`target`+) ions so that the parent intensity has all of the # adduct intensities added to it if mass_delta == mass_to_clean: insDecay_intensity = ins_row["Intensity"] ID = row.ID drop_ID = ins_row.ID # Because long format leads to rows with duplicate IDs, the ["ID"] # of a row is sometimes different from its index in the dataframe. # Because this is sometimes but not always the case, we need this # lookup line: idx = consolidated_decay_df.loc[consolidated_decay_df["ID"] == ID].index[0] # Make sure the row we are trying to consolidate hasn't already # been consolidated and deleted! if not consolidated_decay_df.loc[consolidated_decay_df["ID"] == drop_ID].empty: # Transfer adduct intensity to the parent ion consolidated_decay_df.at[idx, "Intensity"] += insDecay_intensity # Because long format means both IDs and structures can be duplicated, # only ID + structure pairs can be considered unique. Find where IDs # or structures differ and retain only those in the dataframe. This is # the same as *filtering out* rows in which *both* the ID and structure # match the target from ins_row diff_ID = consolidated_decay_df["ID"] != ins_row["ID"] diff_Structure = consolidated_decay_df["Inferred structure"] != ins_row["Inferred structure"] consolidated_decay_df = consolidated_decay_df[diff_ID | diff_Structure] return consolidated_decay_df
[docs] def data_analysis( raw_data_df: pd.DataFrame, theo_masses_df: pd.DataFrame, rt_window: float, enabled_mod_list: list, ppm_tolerance: float, consolidation_ppm: float, ) -> pd.DataFrame: """ Perform analysis. Parameters ---------- raw_data_df : pd.DataFrame User data as Pandas DataFrame. theo_masses_df : pd.DataFrame Theoretical masses as Pandas DataFrame. rt_window : float Set time window for in-source decay and salt adduct cleanup. enabled_mod_list : list List of modifications to enable. ppm_tolerance : float The ppm tolerance used when matching the theoretical masses of structures to observed ions. consolidation_ppm : float The minimum absolute ppm difference between two matches before one is picked as "most likely" over the other. Returns ------- pd.DataFrame Dataframe of matches. """ sugar = Decimal("203.0793") sodium = Decimal("21.9819") potassium = Decimal("37.9559") LOGGER.info("Filtering theoretical masses by observed masses") obs_monomers_df = filtered_theo(ftrs_df=raw_data_df, theo_df=theo_masses_df, user_ppm=ppm_tolerance) # Make sure the enabled_mod_list (if empty), is actually represented by an empty list enabled_mod_list = enabled_mod_list or [] # NOTE: "Multimers" is a semi-magic keyword here. Multimers and modifications are treated # differently by most of the code and have their own sections in `parameters.yaml`, but # despite this, all of the multimer and modification flags are passed to `data_analysis()` # in the same `enabled_mod_list` variable... This is the shortest path to getting something # working now. multimer_mods = [m for m in enabled_mod_list if "Multimers" in m] other_mods = [m for m in enabled_mod_list if m not in multimer_mods] def build_multimers(mod): LOGGER.info("Building multimers from obs muropeptides") theo_multimers_df = multimer_builder(obs_monomers_df, mod) LOGGER.info("Filtering theoretical multimers by observed") return filtered_theo(raw_data_df, theo_multimers_df, ppm_tolerance) obs_theo_df = pd.concat([obs_monomers_df, *(build_multimers(type) for type in multimer_mods)]) def apply_modification(mod): LOGGER.info(f"Generating {mod} variants") return modification_generator(obs_theo_df, mod) LOGGER.info("Building custom search file") master_frame = pd.concat( [ obs_theo_df, *(apply_modification(mod) for mod in other_mods), ] ) master_frame = master_frame.astype({"Theo (Da)": float}) LOGGER.info("Matching") matched_data_df = matching(raw_data_df, master_frame, ppm_tolerance) LOGGER.info("Cleaning data") matched_data_df = calculate_ppm_delta(df=matched_data_df) cleaned_df = clean_up(ftrs_df=matched_data_df, mass_to_clean=sodium, time_delta=rt_window) cleaned_df = clean_up(ftrs_df=cleaned_df, mass_to_clean=potassium, time_delta=rt_window) cleaned_data_df = clean_up(ftrs_df=cleaned_df, mass_to_clean=sugar, time_delta=rt_window) cleaned_data_df.sort_values(by=["Intensity", "RT (min)"], ascending=[False, True], inplace=True, kind="stable") cleaned_data_df.reset_index(drop=True, inplace=True) # Apply some post-processing to the results most_likely_df = pick_most_likely_structures(cleaned_data_df, consolidation_ppm) final_df = consolidate_results(most_likely_df) # set metadata final_df.attrs["file"] = raw_data_df.attrs["file"] final_df.attrs["masses_file"] = theo_masses_df.attrs["file"] final_df.attrs["rt_window"] = rt_window final_df.attrs["modifications"] = enabled_mod_list final_df.attrs["ppm"] = ppm_tolerance final_df.attrs["consolidation_ppm"] = consolidation_ppm return final_df
[docs] def calculate_ppm_delta( df: pd.DataFrame, observed: str = COLUMNS["input"]["obs"], theoretical: str = COLUMNS["inferred"]["mass"], diff: str = COLUMNS["delta"], ) -> pd.DataFrame: """ Calculate the difference in Parts Per Million between observed and theoretical masses. The PPM difference between observed and theoretical mass is calculated as... .. math:: (1000000 * (obs - theor)) / theor The function ensures the column is placed after the theoretical mass column to facilitate its use. Parameters ---------- df : pd.DataFrame Pandas DataFrame of results. observed : str Variable that defines the observed PPM. theoretical : str Variable that defines the theoretical PPM. diff: str Variable to be created that holds the difference in PPM. Returns ------- pd.DataFrame Pandas DataFrame with difference noted in column diff. """ column_order = list(df.columns) theoretical_position = column_order.index(theoretical) + 1 df.insert(theoretical_position, diff, (1000000 * (df[observed] - df[theoretical])) / df[theoretical]) LOGGER.info("Difference in PPM calculated.") return df
[docs] def pick_most_likely_structures( df: pd.DataFrame, consolidation_ppm: float, columns: dict = COLUMNS, ) -> pd.DataFrame: """ Add rows that consolidate ambiguous matches, picking matches with the closest ppm. Parameters ---------- df : pd.DataFrame DataFrame of structures to be processed. consolidation_ppm : float Minimum Parts Per Million tolerance distinguishing matches. columns : dict Dictionary of columns, this defaults to the global COLUMNS which is read from 'config/columns.yaml' and simplifies extension to new formats. Returns ------- pd.DataFrame Dataframe of matches within the specified tolerance. Candidates that are not matched are included in the file for completeness. """ def add_most_likely_structure(group) -> pd.DataFrame: """ Determines the most likely structure. Parameters ---------- group : pd.DataFrame DataFrame of structures. Returns ------- pd.DataFrame Dataframe of most likely values. """ # Sort by lowest absolute ppm first, then break ties with structures (short to long) group.sort_values( by=[columns["delta"], columns["inferred"]["structure"]], ascending=[True, False], key=lambda k: abs(k) if is_numeric_dtype(k) else k, inplace=True, kind="stable", ) group.reset_index(drop=True, inplace=True) abs_min_ppm = group[columns["delta"]].loc[0] abs_min_intensity = group[columns["input"]["intensity"]].loc[0] min_ppm_structure_idxs = abs(abs(abs_min_ppm) - abs(group[columns["delta"]])) < consolidation_ppm min_ppm_structures = ", ".join(group[columns["inferred"]["structure"]].loc[min_ppm_structure_idxs]) group.at[0, f"Inferred structure ({columns['best_match_suffix']})"] = min_ppm_structures group.at[0, f"Intensity ({columns['best_match_suffix']})"] = abs_min_intensity return group matched_rows = df[df[columns["inferred"]["structure"]].notnull()] unmatched_rows = df[df[columns["inferred"]["structure"]].isnull()] grouped_df = matched_rows.groupby("ID", as_index=False, sort=False) most_likely = grouped_df.apply(add_most_likely_structure) merged_df = pd.concat([most_likely, unmatched_rows]) return merged_df.reset_index(drop=True)
[docs] def consolidate_results( df: pd.DataFrame, intensity_column: str = f"Intensity ({COLUMNS['best_match_suffix']})", structure_column: str = f"Inferred structure ({COLUMNS['best_match_suffix']})", rt_column: str = COLUMNS["input"]["rt"], theo_column: str = COLUMNS["inferred"]["mass"], ppm_column: str = COLUMNS["delta"], abundance_column: str = COLUMNS["consolidation"]["Abundance (%)"], oligomer_column: str = "Oligomerisation", total_column: str = COLUMNS["consolidation"]["Total Intensity"], columns: dict = COLUMNS, ) -> pd.DataFrame: """ Add a final table of muropeptide structures and their relative abundances Parameters ---------- df : pd.DataFrame DataFrame of structures to be processed. intensity_column : str Intensity column. structure_column : str Structure column. rt_column : str RT column. theo_column : str Theoretical Mass column. ppm_column : str Delta ppm column. abundance_column : str Abundance column. oligomer_column : str Oligomer column. total_column : str Total column. Returns ------- pd.DataFrame The input dataframe with additional columns containing the consolidated results. """ def pick_from_highest_intensity_instance(column: pd.Series): return column[df.loc[column.index, intensity_column].idxmax()] consolidated_df = ( df.groupby(structure_column) .agg( { rt_column: pick_from_highest_intensity_instance, intensity_column: "sum", theo_column: pick_from_highest_intensity_instance, ppm_column: pick_from_highest_intensity_instance, } ) .reset_index() ) total_intensity = consolidated_df[intensity_column].sum() consolidated_df[abundance_column] = consolidated_df[intensity_column] / total_intensity * 100 consolidated_df[oligomer_column] = consolidated_df[structure_column].apply(lambda s: s[-1]) consolidated_df.sort_values( by=[oligomer_column, abundance_column], ascending=[True, False], inplace=True, kind="stable", ignore_index=True ) consolidated_df.at[0, total_column] = total_intensity consolidated_df = consolidated_df[ [total_column, structure_column, intensity_column, abundance_column, rt_column, theo_column, ppm_column] ] consolidated_df[rt_column] = consolidated_df[rt_column].round(2) consolidated_df[ppm_column] = consolidated_df[ppm_column].round(1) # Rename columns using mapping defined in pgfinder/config/columns.yaml under 'consolidation' consolidated_df.rename(columns=columns["consolidation"], inplace=True) return pd.concat([df, consolidated_df], axis=1)