Source code for drugforge.spectrum.seq_alignment

import logging
import subprocess
from pathlib import Path

import numpy as np
import pandas as pd
from Bio import AlignIO, SeqIO

# BioPython
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from bokeh.layouts import column
from bokeh.models import ColumnDataSource, LabelSet, LinearAxis, Range1d
from bokeh.models.glyphs import Rect, Text

# Bokeh imports
from bokeh.plotting import figure, output_file, save


[docs] class Alignment:
[docs] def __init__(self, blast_match: pd.DataFrame, query: str, dir_save: Path): """An alignment object Parameters ---------- blast_match : pd.DataFrame DataFrame with BLAST results query : str Descriptor of query sequence dir_save : Path Path for directory where results will be saved """ self.dir_save = dir_save self.blast_query = query self.query_label = self.blast_query.strip().replace(" ", "_") df = blast_match.loc[blast_match["query"] == self.blast_query] self.query_matches = df self.seqs = self.query_matches["sequence"].to_numpy() self.ids = self.query_matches["ID"].to_numpy() self.descripts = self.query_matches["description"].to_numpy() self.hosts = self.query_matches["host"].to_numpy() self.organisms = self.query_matches["organism"].to_numpy() self.scores = self.query_matches["score"].to_numpy() self.sucess = False return
@staticmethod def select_checkbox(): raise NotImplementedError( "I'm not sure this can be implemented outside of Jupyter" ) def select_keyword(self, match_string: str, selection_file: str): # First filter unique entries unique_idxs = np.unique(self.seqs, return_index=True)[1] ordered_idxs = np.sort(unique_idxs) unique_seqs = [self.seqs[i] for i in ordered_idxs] unique_ids = [self.ids[i] for i in ordered_idxs] unique_descp = [self.descripts[i] for i in ordered_idxs] unique_scores = [self.scores[i] for i in ordered_idxs] # Filter sequences by keyword substrings = [] for s in match_string.split(","): substrings.append(s) substrings.append(s.capitalize()) substrings.append(s.lower()) substrings.append(s.upper()) filtered_idxs = [ idx for idx, descp in enumerate(unique_descp) if any(substring in descp for substring in substrings) ] filtered_ids = [unique_ids[i] for i in filtered_idxs] filtered_seqs = [unique_seqs[i] for i in filtered_idxs] filtered_descp = [unique_descp[i] for i in filtered_idxs] filtered_scores = [unique_scores[i] for i in filtered_idxs] if len(filtered_seqs) > 0: if unique_seqs[0] != filtered_seqs[0]: filtered_seqs = [unique_seqs[0]] + filtered_seqs filtered_descp = [unique_descp[0]] + filtered_descp filtered_ids = [unique_ids[0]] + filtered_ids filtered_scores = [unique_scores[0]] + filtered_scores else: logging.warning("The keyword provided didn't return any matches") filtered_seqs = [unique_seqs[0]] filtered_descp = [unique_descp[0]] filtered_ids = [unique_ids[0]] filtered_scores = [unique_scores[0]] self.seqs = filtered_seqs self.ids = filtered_ids self.descripts = filtered_descp self.scores = filtered_scores records = [] for r in range(len(self.ids)): rec = SeqRecord( Seq(self.seqs[r]), id=self.ids[r], description=self.descripts[r] ) records.append(rec) self.seq_records = selection_file SeqIO.write(records, self.seq_records, "fasta") return selection_file def select_taxonomy(self, match_string: str, selection_file: str): # First filter unique entries unique_idxs = np.unique(self.seqs, return_index=True)[1] ordered_idxs = np.sort(unique_idxs) # Extract info from provided match string or_querys = match_string.split("OR") host_str = "xxxxx" org_str = "xxxxx" for q in or_querys: if "host" in q: host_str = " ".join(q.strip().split(" ")[1:]) elif "organism" in q: org_str = " ".join(q.strip().split(" ")[1:]) filtered_idxs = [ idx for idx in ordered_idxs if any( [ host_str.casefold() in self.hosts[idx].casefold(), org_str.casefold() in self.organisms[idx].casefold(), ] ) ] filtered_ids = [self.ids[i] for i in filtered_idxs] filtered_seqs = [self.seqs[i] for i in filtered_idxs] filtered_descp = [self.descripts[i] for i in filtered_idxs] filtered_hosts = [self.hosts[i] for i in filtered_idxs] filtered_orgs = [self.organisms[i] for i in filtered_idxs] filtered_scores = [self.scores[i] for i in filtered_idxs] if len(filtered_seqs) > 0: if self.seqs[0] != filtered_seqs[0]: filtered_seqs = [self.seqs[0]] + filtered_seqs filtered_descp = [self.descripts[0]] + filtered_descp filtered_ids = [self.ids[0]] + filtered_ids else: logging.warning("The keyword provided didn't return any matches") filtered_seqs = [self.seqs[0]] filtered_descp = [self.descripts[0]] filtered_ids = [self.ids[0]] self.seqs = filtered_seqs self.ids = filtered_ids self.descripts = filtered_descp self.hosts = filtered_hosts self.organisms = filtered_orgs self.scores = filtered_scores records = [] for r in range(len(self.ids)): rec = SeqRecord( Seq(self.seqs[r]), id=self.ids[r], description=self.descripts[r] ) records.append(rec) self.seq_records = selection_file SeqIO.write(records, self.seq_records, "fasta") return selection_file def multi_seq_alignment(self, alignment_file): # Run alignment with MAFFT # SeqIO.write(self.seq_records, temp_file, "fasta") cmd = f"mafft {self.seq_records} > {alignment_file}" sp_output = subprocess.run(cmd, shell=True, capture_output=True) sp_output.check_returncode() self.align_obj = AlignIO.read(alignment_file, "fasta") return alignment_file
[docs] def view_alignment( self, fontsize="11pt", plot_width=800, file_name="alignment", color_by_group=False, start_idx=0, skip=4, max_mismatch=2, reorder="", ): """ "Bokeh sequence alignment view From: https://dmnfarrell.github.io/bioinformatics/bokeh-sequence-aligner Parameters ---------- fontsize : str, optional Size of aminoacid one-letter IDs, by default "11pt" plot_width : int, optional width of alignment plot, by default 800 file_name : str, optional suffix for html file, by default "alignment" color_by_group : bool, optional View mode where matching aminoacids are colored, by default False start_idx : int, optional Index of first aminiacid of reference sequence, by default 0 skip : int, optional Skip for displayed indexes of reference sequence , by default 4 max_mismatch : int, optional How many mismatches are tolerated for highlighted group match, by default 2 Returns ------- (bokeh.Column, str) Bokeh Column of layouts, path to saved html file. """ # The function takes a biopython alignment object as input. aln = self.align_obj if len(reorder[0]) > 0: aln_ref = aln[:1] # ref aln_sorted = [aln[int(i)] for i in reorder] aln_ref.extend(aln_sorted) aln = aln_ref aln = aln[::-1] # So outputs are ordered from top to bottom seqs = [rec.seq for rec in (aln)] # Each sequence input text = [i for s in list(seqs) for i in s] # Al units joind on same list N = len(seqs[-1]) S = len(seqs) # Shorten the description for display (string between the last [*]) def matches(x): import re pattern = r"\[(.*?)\]" return re.findall(pattern, x)[-1] desc = [f"{matches(rec.description)} ({rec.id})" for rec in aln] colors_dict = {"exact": "white", "group": "orange", "none": "red"} # List with ALL colors # By aminoacid group or exact match if color_by_group: col_colors = [] font_colors = [] match_keys = [] for col in range(N): # Go through each column # Note: AlignIO item retrieval is done through a get_item function, so this has to be done with a loop col_string = aln[:, col] color, font_color, match_key = get_colors_by_aa_group( col_string, max_mismatch, colors_dict ) col_colors.append(color) font_colors.append(font_color) match_keys.append(match_key) colors = col_colors * S # Append each font_color list "colum-wise" font_colors = np.array(font_colors).T.flatten() # get a dictionary with counts for a printed report from collections import Counter logging.info( "The multi-sequence alignment returns the following matches:", ) for key, value in Counter(match_keys).items(): logging.info(f"{key}: {value}/{N}") else: colors = get_colors_protein(seqs) font_colors = ["black"] * len(colors) # Defining x indexes only for non-gap characters of ref sequence (seqs[-1]) seq_array = np.array(list(seqs[-1])) x_non_gap = np.full(len(seqs[-1]), " ", dtype="<U3") non_gap_idx = np.where(seq_array != "-")[0] current_idx = start_idx x_non_gap_locs = [] # Iterate to indexes (this way we skip the gaps in the middle) for idx in non_gap_idx: if idx in non_gap_idx[::skip]: # Skips every given index x_non_gap[idx] = str(current_idx) x_non_gap_locs.append(idx) current_idx += 1 x = np.arange(0, N) y = np.arange(0, S, 1) # creates a 2D grid of coords from the 1D arrays xx, yy = np.meshgrid(x, y) # flattens the arrays gx = xx.ravel() gy = yy.flatten() # use recty for rect coords with an offset recty = gy + 0.5 # now we can create the ColumnDataSource with all the arrays logging.info(f"Aligning {S} sequences of lenght {N}") # ColumnDataSource is a JSON dict that maps names to arrays of values source = ColumnDataSource( dict(x=gx, y=gy, recty=recty, text=text, colors=colors) ) plot_height = len(seqs) * 10 + 50 x_range = Range1d(gx[0] - 1, N + 8, bounds="auto") # (start, end) if N > 150: viewlen = 150 else: viewlen = N # view_range is for the close up view view_range = (gx[0] - 1, viewlen) tools = "xpan, xwheel_zoom, reset, save" # Custom right-side labels right_labels1 = [f"{round(score, 1)}%" for score in self.scores][::-1] x_offsets = N # y values should match desc (in reversed order if needed) source2 = ColumnDataSource( data=dict( x=[x_offsets] * len(desc), # Position labels just outside the plot (adjust as needed) y=desc, labels=right_labels1, ) ) labels = LabelSet( x="x", y="y", text="labels", level="glyph", x_offset=0, y_offset=0, source=source2, text_align="left", text_baseline="middle", text_font_size=str(int(fontsize[:-2]) - 2) + "pt", ) # entire sequence view (no text, with zoom) p1 = figure( title=None, width=plot_width, height=plot_height, x_range=x_range, y_range=desc, tools=tools, min_border=0, ) p1.toolbar_location = None # Rect simply places rectangles of wifth "width" into the positions defined by x and y rects = Rect( x="x", y="recty", width=1, height=1, fill_color="colors", line_color=None, fill_alpha=0.6, ) # Source does mapping from keys in rects to values in ColumnDataSource definition p1.add_glyph(source, rects) p1.grid.visible = False p1.xaxis.major_label_text_font_style = "bold" p1.yaxis.major_label_text_font_size = "8pt" p1.yaxis.minor_tick_line_width = 0 p1.yaxis.major_tick_line_width = 0 p1.add_layout(labels) plot_height = len(seqs) * 20 + 30 # sequence text view with ability to scroll along x axis p2 = figure( title=None, width=plot_width, height=plot_height, x_range=view_range, y_range=desc, tools=tools, min_border=0, toolbar_location="below", ) # Text does the same thing as rectangles but placing letter (or words) instead, aligned accordingly text_source = ColumnDataSource( dict(x=gx, y=gy, recty=recty, text=text, colors=font_colors) ) glyph = Text( x="x", y="y", text="text", text_color="colors", text_align="center", text_font_size=fontsize, ) rects = Rect( x="x", y="recty", width=1, height=1, fill_color="colors", line_color=None, fill_alpha=0.4, ) # Blank plot to hold the position labels p_blank = figure( width=plot_width, height=40, x_range=view_range, y_range=Range1d(0, 1), title=None, toolbar_location=None, tools="", outline_line_alpha=0, ) p_blank.xaxis.visible = False p_blank.yaxis.visible = False p_blank.grid.visible = False label_source = ColumnDataSource(dict(x=x, y=[0.05] * len(x), text=x_non_gap)) labels_b = Text( x="x", y="y", text="text", text_color="black", text_align="center", text_font_size=str(int(fontsize[:-2]) - 2) + "pt", ) p2.add_glyph(text_source, glyph) p2.add_glyph(source, rects) p_blank.add_glyph(label_source, labels_b) p2.add_layout(labels) view_range = Range1d(gx[0] - 1, viewlen) p2.grid.visible = True p2.xaxis.major_label_text_font_style = "bold" p2.yaxis.major_label_text_font_style = "bold" p2.yaxis.minor_tick_line_width = 0 p2.yaxis.major_tick_line_width = 0 p2.xaxis.major_label_text_font_size = "0pt" p2.add_layout( LinearAxis(major_label_text_font_size="0pt", ticker=list(x_non_gap_locs)), "above", ) p2.x_range = view_range p_blank.x_range = view_range p = column(p1, p_blank, p2) output_file(filename=f"{file_name}.html", title="Alignment result") save(p) return p, f"{file_name}.html"
[docs] @staticmethod def fasta_align_data(input_alignment, output_file): """Modify sequences in multi-seq alignment to remove gap characters '-'""" alignment = SeqIO.parse(input_alignment, "fasta") filtered_sequences = [] for rec in alignment: # Remove gap characters '-' from the sequence filtered_sequence = rec.seq.ungap( "-" ) # ''.join(char for char in rec.seq if char != '-') # Update SeqRecord with the filtered sequence filtered_seq_record = SeqRecord( filtered_sequence, id=rec.id, description=rec.description ) filtered_sequences.append(filtered_seq_record) SeqIO.write(filtered_sequences, output_file, "fasta") return output_file
@staticmethod def csv_align_data(input_alignment, output_file, n_chains): alignment = SeqIO.parse(input_alignment, "fasta") df = pd.DataFrame(columns=["id", "sequence"]) for rec in alignment: label_parts = rec.id.split("|")[1].split(".") red_label = f"{label_parts[0]}_{label_parts[1]}" seq_print = str(rec.seq) if n_chains > 1: # ColabFold reads multimer chains separated by ":" seq_print = ":".join([seq_print] * n_chains) dfi = pd.DataFrame.from_dict({"id": [red_label], "sequence": [seq_print]}) df = pd.concat([df, dfi], ignore_index=True) df.to_csv(output_file, index=False) return output_file
[docs] def do_MSA( alignment: Alignment, select_mode: str, file_prefix: str, plot_width: int, n_chains: int, color_by_group: bool, start_alignment_idx: int, max_mismatch: int, custom_order: str, ): save_file = alignment.dir_save / file_prefix # Select sequeneces of interest if select_mode == "checkbox": select_file = alignment.select_checkbox() elif "host" in select_mode or "organism" in select_mode: if alignment.hosts[0] is None: raise NameError( "The csv input file provided does not have host information, you have to use the 'keyword' mode." ) else: select_file = alignment.select_taxonomy(select_mode, f"{save_file}.fasta") else: select_file = alignment.select_keyword(select_mode, f"{save_file}.fasta") alignment.select_file = select_file logging.info( f"A fasta file {alignment.select_file} have been generated with the selected sequences" ) # Do multisequence alignment align_fasta = alignment.multi_seq_alignment(f"{save_file}_alignment.fasta") alignment.align_file = align_fasta logging.info( f"A fasta file {alignment.align_file} have been generated with the multi-seq alignment" ) # Save CSV for ColabFold step clean_csv = alignment.csv_align_data( alignment.select_file, f"{save_file}.csv", n_chains ) logging.info( f"A csv file {clean_csv} have been generated with the selected sequences" ) p, align_html = alignment.view_alignment( plot_width=plot_width, file_name=f"{save_file}_alignment", color_by_group=color_by_group, start_idx=start_alignment_idx, max_mismatch=max_mismatch, reorder=custom_order.split(","), ) logging.info( f"A html file {align_html} have been generated with the aligned sequences" ) alignment.sucess = True return alignment
# Defining colors for each protein residue
[docs] def get_colors_protein(seqs): """Make colors for bases in sequence Parameters ---------- seq : List[str] List with protein sequences Returns ------- list[str] List with colors """ # List of all aminoacids in the list of seqs, in the same list text = [i for s in list(seqs) for i in s] colors = [] for aa in text: amino = AAcid(aa) colors.append(amino.get_aminoacid_color()) return colors
# Defining colors for each protein residue
[docs] def get_colors_by_aa_group(seq: str, max_mismatch: int, colors: dict): """Make fill and text color for exact and group aminoacid matches Parameters ---------- seq : str String with protein sequence max_mismatch : int Maximum number of group mismatches after which match won't be highlighted colors : dict Dictionary with colors to use: {"exact", "group", "none"} Returns ------- str, list[str] Fill color, Font colors """ from collections import Counter seq_len = len(seq) aa_groups = [AAcid(aa).get_aminoacid_group() for aa in seq] aa_counts = Counter(aa_groups) max_group, max_group_count = aa_counts.most_common(1)[0] # Default font color font_color = ["black"] * seq_len # Check the case where all aa's are the same if seq == seq_len * seq[0]: key = "exact" # Check the case where all aa's belong to the same group (with some max mismatches) elif max_group_count >= seq_len - max_mismatch: if max_group is None: # In case most "matches" are gaps key = "none" else: key = "group" # Make font red for mismatches font_color = ["black" if item == max_group else "red" for item in aa_groups] else: key = "none" color = colors[key] return color, font_color, key
_AMINO_ACID_GROUPS = { "aliphatic": ["A", "V", "I", "L", "M"], "aromatic": ["F", "W", "Y"], "neutral": ["N", "Q", "S", "T"], "acidic": ["D", "E"], "basic": ["R", "H", "K"], "cys": ["C"], "gly": ["G"], "pro": ["P"], } _AMINO_ACID_COLORS = { "A": "red", # Alanine "R": "blue", # Arginine "N": "green", # Asparagine "D": "yellow", # Aspartic acid "C": "orange", # Cysteine "Q": "purple", # Glutamine "E": "cyan", # Glutamic acid "G": "magenta", # Glycine "H": "pink", # Histidine "I": "brown", # Isoleucine "L": "gray", # Leucine "K": "lime", # Lysine "M": "teal", # Methionine "F": "navy", # Phenylalanine "P": "olive", # Proline "S": "maroon", # Serine "T": "silver", # Threonine "W": "gold", # Tryptophan "Y": "skyblue", # Tyrosine "V": "violet", # Valine "-": "white", }
[docs] class AAcid:
[docs] def __init__(self, letter_id): """An aminoacid object Parameters ---------- letter_id : str Amino Acid one or three-letter identifier """ from Bio.SeqUtils import seq1, seq3 # An empty aminoacid if letter_id == "-": self.one_letter_id = letter_id self.three_letter_id = None elif len(letter_id) == 1: self.one_letter_id = letter_id self.three_letter_id = seq3(letter_id) elif len(letter_id == 3): self.three_letter_id = letter_id self.one_letter_id = seq1(letter_id) else: raise ValueError( "The input must be either the aminoacid 1 or 3-letter code" )
def get_aminoacid_group(self): for key in _AMINO_ACID_GROUPS: if self.one_letter_id in _AMINO_ACID_GROUPS[key]: return key return def get_aminoacid_color(self): return _AMINO_ACID_COLORS[self.one_letter_id]