from difflib import SequenceMatcher
from typing import Dict
import os
from protlego.builder.alignments import *
from protlego.database.data import *
from protlego.builder.AAcodes import *
from protlego.builder.chimera import get_SCOP_domain, get_FUZZLE_hhs
from protlego.builder.chimera import Chimera
from csb.bio.io.hhpred import HHOutputParser
from csb.bio.hmm import HHpredHitAlignment
from moleculekit.molecule import Molecule
from moleculekit.projections.metricrmsd import MetricRmsd
from moleculekit.projections.metricsecondarystructure import MetricSecondaryStructure
from scipy.spatial.distance import cdist
import matplotlib.pyplot as plt
import logging
logging.getLogger('moleculekit').setLevel(logging.ERROR)
logger = logging.getLogger('protlego')
#TODO: Put real path after setup
TMALIGNPATH = '/agh/sw/pkg/tmalign/20170708/TMalign'
aa_keys = special2standard.keys()
standard_aas = three2one.keys()
[docs]class NotDiverseChimeraError(Exception):
pass
[docs]class BackboneClashError(Exception):
pass
[docs]class NotCorrectPDBError(Exception):
pass
[docs]class Builder():
"""
Graph constructor and visualizer
Examples
--------
myhit= fetch_id('10002347')
a=Builder(myhit)
aln=a.get_alignment(myhit.query,myhit.no)
qPDB, sPDB = a.superimpose_structures(aln,partial_alignment=True)
chimeras=a.build_chimeras(partial_alignment=True)
"""
def __init__(self, hit: Hit):
qpdb_path = get_SCOP_domain(hit.query)
spdb_path = get_SCOP_domain(hit.sbjct)
logger.info(f'Loading {qpdb_path} as a chimera object')
self.qPDB = Chimera(qpdb_path, validateElements=False)
os.remove(qpdb_path)
if self.qPDB.numFrames > 1:
self.qPDB.dropFrames(keep=0)
logger.info("Query protein contains more than one model. Keeping only the first one")
logger.info(f'Loading {spdb_path} as a chimera object')
self.sPDB = Chimera(spdb_path, validateElements=False)
os.remove(spdb_path)
if self.sPDB.numFrames > 1:
self.sPDB.dropFrames(keep=0)
logger.info("Subject protein contains more than one model. Keeping only the first one")
self.qaPDB, self.saPDB = {}, {}
self.qpairs,self.spairs = [], []
self.dst = []
self.chim_positions = {}
[docs] def get_alignment(self, query: str, no: str) -> HHpredHitAlignment:
""" Obtain the HHS alignment 'no' for query 'query'.
Only the alignment from the fragment region is retrieved.
This implies that when the fragment is not located in the
N-terminus hit.q_start and the position in the output won't
be the same. For example, if q_start = 20, that aminoacid is
in position 0 in aln.query.
:param query: str. Domain query
:param no: int. Specifies the position in the file (alignment with subject)
:return: HHpredHitAlignment. Alignment between query and subject for the fragment region.
"""
hhF = get_FUZZLE_hhs(query)
try:
hh = HHOutputParser().parse_file(hhF)
pair = hh[int(no) - 1]
aln = pair.alignment
return aln
except Exception as e:
logger.error(f"Parsing of {hhF} failed. Error follows: {e}")
[docs] @staticmethod
def remove_residue(pdb: Molecule, resid):
"""
Removes a specific residue by its name
:param pdb: The pdb to mutate the residues to
:param resid:
:return: The pdb with the filtered residues
"""
pdb.filter(f"not {resid}")
return pdb
[docs] @staticmethod
def find_nonstandards(pdb: Molecule) -> list:
"""
Finds non-standard aminoacids
:param pdb: Molecule or Chimera object where to find non-standard aminoacids.
:return: list of non-standard residues
"""
non_standards = [aa for aa in np.unique(pdb.resname) if (aa in aa_keys or aa not in standard_aas)]
if non_standards:
for i in non_standards:
if i != 'UNK':
logger.info(f"Found the following non-standar residue: {i}. "
f"Preserving in the original PDB")
else:
logger.warning("Protein presents unknown residue UNK."
" Call remove_residue() to remove it or provide parameters"
" if you want to minimize it with AMBER or CHARMM.")
return non_standards
[docs] @staticmethod
def mutate_nonstandards(pdb: Molecule) -> Molecule:
"""
:param pdb: The pdb to mutate the residues to
:return: The pdb with the mutated residues -if there are any-
"""
non_standards = Builder.find_nonstandards(pdb)
if non_standards:
[pdb.mutateResidue(f"resname {i}", f"{special2standard[i]}") for i in non_standards]
return pdb
[docs] def seq2pdbaln(self, sequence: str, pdb: Molecule) -> List:
""" Obtains the resid (positions in the PDB) of the aminoacids involved in the fragment.
:param sequence: str. sequence of the fragment
:param pdb: Molecule. PDB from where to obtain the residues
:return: mapping. The the PDB Positions of the fragment. A List of tuples of the form: (sequence aminoacid, pdb resid)
"""
# Mutate non standard residues
copy_pdb = pdb.copy() # making a copy to ensure the possible mutations don't affect the pdb
copy_pdb = self.mutate_nonstandards(copy_pdb)
# Mapping of residue to number
try:
pdb_sequence = copy_pdb.sequence()['0']
except:
raise NotCorrectPDBError(f"PDB structure from protein {pdb.viewname} not correct")
copy_pdb = pdb.copy() # second copy to preserve indexing
pdb_indices = copy_pdb.get("index", sel="protein and name CA")
# sequence from the HHS alignment (fragment)
seq_positions = [(x,) for x in sequence]
# obtain mapping
matcher = SequenceMatcher(None, sequence, pdb_sequence, autojunk=False)
for block in matcher.get_matching_blocks():
i = 0
for pos in range(block.a, block.a + block.size):
seq_positions[pos] += (pdb_indices[block.b + i],)
i += 1
return seq_positions
def _get_pairs(self, aln: HHpredHitAlignment):
""" Obtain the positions of the pdb which are present in both alignments.
Thus the number of atoms is the same
:param aln: HHpredHitAlignment. The fragment sequence alignment
"""
qpairs = []
spairs = []
preqpairs = []
prespairs = []
# Obtain the corresponding PDB residues to the fragment seq. alignment
query_seq2pdb = self.seq2pdbaln(aln.query, self.qPDB)
sbjct_seq2pdb = self.seq2pdbaln(aln.subject, self.sPDB)
# Retrieving correct residue sequence numbers for each aligned position
# i.e positions that both query and subject contain aminoacids
for index, resi in enumerate(aln.columns):
index += 1 # the first residue in HHpredHitAlignment is 1
if aln.gap_at(index) is False:
if len(query_seq2pdb[index - 1]) == 2 and len(sbjct_seq2pdb[index - 1]) == 2:
preqpairs.append(query_seq2pdb[index - 1][1])
prespairs.append(sbjct_seq2pdb[index - 1][1])
else:
# then this corresponds to a partial-alignment chunk and we can save it
if preqpairs:
qpairs.append(preqpairs)
spairs.append(prespairs)
preqpairs = []
prespairs = []
if preqpairs:
qpairs.append(preqpairs)
spairs.append(prespairs)
self.qpairs = qpairs
self.spairs = spairs
self.global_qpairs = [[item for chunk in qpairs for item in chunk]]
self.global_spairs = [[item for chunk in spairs for item in chunk]]
[docs] def superimpose_structures(self, aln: HHpredHitAlignment, partial_alignment: bool = False):
""" Moves the two molecules to the origin of coordinates.
Aligns the full structures according the fragment and obtains RMSDs and distances.
:param qpairs: List of CA to be aligned in the query structure
:param spairs: List of CA to be aligned in the subject structure
:return:
"""
self._get_pairs(aln)
# Re-align if command is called twice
if self.qaPDB:
self.qaPDB = {}
self.saPDB = {}
self.dst = []
if partial_alignment is False:
qpairs = self.global_qpairs
spairs = self.global_spairs
else:
qpairs = self.qpairs
spairs = self.spairs
# Print info if the alignment was intended partial but there's only one chunk
if len(qpairs) == 1 and partial_alignment is True:
logger.info("The sequence alignment only contains one chunk. Performing global alignment")
# We only need one query, center to the origin of coordinates.
# It is the subject that aligns to this template.
qmol = self.qPDB.copy()
smol = self.sPDB.copy()
qmol.center()
smol.center()
self.qaPDB[0] = qmol.copy()
for index, qpair_chunk in enumerate(qpairs):
logger.info(f"Performing alignment {index+1} with TMalign")
saPDB, distance = self._superimpose_chunk(qpair_chunk, spairs[index], qmol, smol)
self.dst.append(distance)
self.saPDB[index] = saPDB.copy()
self.global_dst = [[item for chunk in self.dst for item in chunk]]
return self.qaPDB, self.saPDB
def _superimpose_chunk(self, qpairs: list, spairs: list, qmol: Molecule, smol: Molecule) -> Tuple[
Molecule, np.ndarray]:
"""
Superimposes the part of the total sequence alignment defined
by the indexes qpairs/spairs
:param qpairs: list of indexes to align from the pdb query
:param spairs: list of indexes to align from the pdb subject
:param qmol: the query pdb
:param smol: the subject pdb
:return: smol: the subject molecule aligned.
distances: a list of the distances between the alpha Carbons after alignment.
"""
# Copying because we are going to cut the pdbs into the chunks
copyq = qmol.copy()
copys = smol.copy()
copyq.filter('protein and backbone and same residue as index %s' % ' '.join(map(str, qpairs)))
copys.filter('protein and backbone and same residue as index %s' % ' '.join(map(str, spairs)))
copyq.write('/tmp/copyq.pdb')
copys.write('/tmp/copys.pdb')
# Matrix for VMD
try:
# We align subject onto the query
tm_matrix = get_tmalign_output('/tmp/copys.pdb', '/tmp/copyq.pdb', "matrix")
except:
raise ChildProcessError("TMalign cannot align the PDBs. Exiting")
vectran, matrot = tm2vmd(tm_matrix)
# remove copy files
os.remove('/tmp/copyq.pdb')
os.remove('/tmp/copys.pdb')
# align the whole subject domain and fragment.
# Copying so that the original smol does not lose the origin of coordinates
s1mol = smol.copy()
s1mol.rotateBy(matrot)
s1mol.moveBy(vectran)
copys.rotateBy(matrot)
copys.moveBy(vectran)
# Compute RMSD for the fragments
rmsd = MetricRmsd(copyq, 'protein and name CA')
data = rmsd.project(copys)
logger.info(f"The RMSD between the fragments is {data} over {len(spairs)} alpha carbons")
# Compute distances between the two selections
bbq = copyq.get("coords", sel="protein and name CA")
bbs = copys.get("coords", sel="protein and name CA")
distances = np.diagonal(cdist(bbq, bbs))
return s1mol, distances
def _construct_chimera(self, qmol, smol, qstart, qend, sstart, send, combination):
"""
:param qmol: Molecule. The query protein
:param smol: Molecule. The subject protein in any of its positions
:param qstart: int. Position to start the cut in the query
:param qend: int. Position to end the cut in the query
:param sstart: int. Position to start the cut in the sbjct
:param send: int. Position to end the cut in the sbjct.
:return: Molecule, DataFrame Objects.
chim1: The resulting chimera
mapping: The mapping from the old residue numbering to the new one
"""
qmol_copy = qmol.copy()
smol_copy = smol.copy()
qmol_copy.filter(f"(protein and same residue as index '{qstart}' to '{qend}')\
or (not protein and same residue as within 4 of protein and same residue as index '{qstart}' to '{qend}')")
smol_copy.filter(f"(protein and same residue as index '{sstart}' to '{send}')\
or (not protein and same residue as within 4 of protein and same residue as index '{qstart}' to '{qend}')")
# Avoid chimeras that only have a few mutations from
# one of the parents
qmol_resid = qmol_copy.get("resid", sel="protein and name CA")
smol_resid = smol_copy.get("resid", sel="protein and name CA")
if qmol_resid.size < 10 or smol_resid.size < 10:
raise NotDiverseChimeraError
bbq = qmol_copy.get("coords", sel=f"protein and backbone")
bbs = smol_copy.get("coords", sel=f"protein and backbone")
distances = cdist(bbq, bbs)
idx1, idx2 = np.where(distances < 1.3)
if idx1.any() or idx2.any():
raise BackboneClashError
else:
chim1 = Chimera()
qmol_copy.renumberResidues()
smol_copy.renumberResidues()
if combination == 1:
last_id = smol_resid[-1] + 1
new_ids = get_new_resIDs(qmol_copy, last_id)
qmol_copy.set("resid", new_ids)
chim1.append(smol_copy)
chim1.append(qmol_copy)
else:
last_id = qmol_resid[-1] + 1
new_ids = get_new_resIDs(smol_copy, last_id)
smol_copy.set("resid", new_ids)
chim1.append(qmol_copy)
chim1.append(smol_copy)
chim1.set("chain", "A", "all")
return chim1, last_id
[docs] def build_chimeras(self, partial_alignment: bool = False, cutoff_distance: float = 1) -> Dict[str, Chimera]:
""" Build all possible chimeras between the two proteins that fulfill
these two criteria:
1) That the distance between the fusion points is below the cutoff distance
2) That the resulting chimera does not present any backbone clashes
:return: A dictionary with all the possible chimeras
"""
if self.dst is None:
logger.error("You need to align the structures before building the chimeras")
chimeras = {}
outcomes = ['Query N-terminal', 'Subject N-terminal', 'Not enough mutations Query N-terminal',
'Not enough mutations Subject N-terminal', 'Backbone clash']
self.chim_positions = dict(zip(outcomes, [[] for i in range(len(outcomes))]))
q_indices = self.qPDB.get("index", sel="protein and name CA")
qstart = min(q_indices)
qend = max(q_indices)
s_indices = self.sPDB.get("index", sel="protein and name CA")
sstart = min(s_indices)
send = max(s_indices)
if partial_alignment is False:
qpairs = self.global_qpairs
spairs = self.global_spairs
dst = self.global_dst
else:
qpairs = self.qpairs
spairs = self.spairs
dst = self.dst
# Get the positions in the fragment closer than the cutoff
for aln_index, chunk in enumerate(dst):
if aln_index not in self.saPDB:
logger.error(f"Alignment {aln_index+1} was not produced. Skipping to next alignment.")
continue
fusion_points = [index for index, distance in enumerate(chunk) if distance < cutoff_distance]
# Build query-subject chimera
for index in fusion_points:
qMOL = self.qaPDB[0].copy()
sMOL = self.saPDB[aln_index].copy()
xo_query = qpairs[aln_index][index]
xo_subject = spairs[aln_index][index]
xo_index = [index for index, number in enumerate(self.global_qpairs[0]) if number == xo_query][0]
residues = self.qPDB.get("resid", sel="index %s" % ' '.join(map(str, self.global_qpairs[0])))
xo_resid = residues[xo_index]
try:
xo_query_1 = qpairs[aln_index][index + 1]
xo_subject_1 = spairs[aln_index][index + 1]
except:
# Position corresponds to C-terminus limit of the fragment
xo_query_1 = [i + 1 for i, qindex in enumerate(q_indices) if qindex == xo_query][0]
xo_subject_1 = [i + 1 for i, sindex in enumerate(s_indices) if sindex == xo_subject][0]
# Combination query-subject
try:
chimera1, xo = self._construct_chimera(qMOL, sMOL, qstart, xo_query,
xo_subject_1, send, 0)
self.chim_positions['Query N-terminal'].append(xo_query)
chimeras[f"comb1_{xo_resid}"] = chimera1
chimeras[f"comb1_{xo_resid}"].add_crossover(xo)
except NotDiverseChimeraError:
self.chim_positions['Not enough mutations Query N-terminal'].append(xo_query)
except BackboneClashError:
self.chim_positions['Backbone clash'].append(xo_query)
# Combination subject-query
try:
chimera2, xo = self._construct_chimera(qMOL, sMOL, xo_query_1, qend,
sstart, xo_subject, 1)
self.chim_positions['Subject N-terminal'].append(xo_query)
chimeras[f"comb2_{xo_resid}"] = chimera2
chimeras[f"comb2_{xo_resid}"].add_crossover(xo)
except NotDiverseChimeraError:
self.chim_positions['Not enough mutations Subject N-terminal'].append(xo_query)
except BackboneClashError:
self.chim_positions['Backbone clash'].append(xo_query)
if not chimeras:
logger.warning("No combination of query and subject produced a chimera that matched the criteria")
return chimeras
[docs] def plot_curves(self, query: str):
""" Plots the distance between the alpha carbons in the structure for the fragment region along with the number of backbone clashes of the resulting chimera for each position
:param dst: np.ndarray: an array containign the distance for each fragment position
:param bbContacts1: an array containing the number of bb contacts for each fragment position for the resulting chimera of combination query-subject
:param bbContacts2: an array containing the number of bb contacts for each fragment position for the resulting chimera of combination subject - query
:return: a matplotlib.pyplot figure.
"""
if self.chim_positions is None:
logger.error("You need to build the chimeras before plotting. Call build_chimeras()")
return
dst = self.global_dst[0]
residues = self.qPDB.get("resid", sel="index %s" % ' '.join(map(str, self.global_qpairs[0])))
resids = {}
distances = {}
for key, value in self.chim_positions.items():
if value:
resids[key] = self.qPDB.get("resid", sel="index %s" % ' '.join(map(str, value)))
else:
resids[key] = np.zeros(0)
for key, value in resids.items():
if value.any():
indices = [index for index, resi in enumerate(residues) if resi in value]
distances[key] = [distance for index, distance in enumerate(dst) if index in indices]
else:
distances[key] = []
color = [('#FCB711', "X"), ('#CC004C', "x"), ('gray', 'o'), ('gray', "o"), ('black', ".")]
colors = dict(zip(resids.keys(), color))
fig, ax = plt.subplots(figsize=(12, 9))
ax.plot(residues, dst, '-', color='black', label='distance q-s')
ax.set_xlabel(f"Residue in the fragment relative to domain {query}", fontsize=24)
ax.set_ylabel(r'Distance ($\AA$)', fontsize=24)
i = 0
for key, value in sorted(resids.items()):
if value.any():
i += 1
ax.plot(value, distances[key], colors[key][1], markersize=18, color=colors[key][0], label=key)
ax.tick_params(labelsize=20)
ax.tick_params(labelsize=20)
ax.legend(loc=9, bbox_to_anchor=(0.5, 1.35), fontsize=18)
plt.show()
# def build_recombinations(self,partial_alignment:bool = False, cutoff_distance=1):
# """
# #TODO for now implemented only to be GLOBAL ALIGNMENT
# #TODO NEEDS TESTING - still working with resids
# #TODO perhaps is a good idea to follow this algorithm for the single crossover chimeras
# :return:
# """
# if self.dst is None:
# logger.error("You need to align the structures before building the chimeras")
# return
#
# recombinations = {}
#
# if partial_alignment is False:
# qpairs = self.global_qpairs
# spairs = self.global_spairs
# dst = self.global_dst
# else:
# qpairs = self.qpairs
# spairs = self.spairs
# dst = self.dst
#
# combs = []
#
# fusion_points = [index for index, distance in enumerate(dst[0]) if distance < cutoff_distance]
#
# for pair in itertools.combinations(fusion_points, 2):
# if pair[0] < pair[1] and pair[1] - pair[0] > 10:
# combs.append(pair)
#
# qMOL = self.qaPDB[0].copy()
# sMOL = self.saPDB[0].copy()
# for crossovers in combs:
# xo_query_1 = qpairs[0][crossovers[0]]
# xo_query_2 = qpairs[0][crossovers[1]]
# xo_subject_1 = spairs[0][crossovers[0]]
# xo_subject_2 = spairs[0][crossovers[1]]
# try:
# recomb1,recomb2= self._construct_recombination(qMOL,sMOL,crossovers[0],crossovers[1],
# xo_query_1, xo_query_2,
# xo_subject_1, xo_subject_2)
# recombinations[f"comb1_{crossovers[0]}_{crossovers[1]}"] = Chimera(recomb1)
# recombinations[f"comb2_{crossovers[0]}_{crossovers[1]}"] = Chimera(recomb2)
# except:
# continue
#
# return recombinations
#
# def _construct_recombination(self, qmol,smol,index1,index2,
# xo_query_1, xo_query_2,
# xo_subject_1, xo_subject_2):
# qmol_1 = qmol.copy()
# qmol_2 = qmol.copy()
# qmol_3 = qmol.copy()
#
# smol_1 = smol.copy()
# smol_2 = smol.copy()
# smol_3 = smol.copy()
#
# qstart = min(qmol.resid)
# qend = max(qmol.resid)
#
# sstart = min(qmol.resid)
# send = max(qmol.resid)
#
# qmol_1.filter(f"protein and resid '{qstart}' to '{xo_query_1}'")
# qmol_2.filter(f"protein and resid '{xo_query_1+1}' to '{xo_query_2}'")
# qmol_3.filter(f"protein and resid '{xo_query_2+1}' to '{qend}'")
#
# smol_1.filter(f"protein and resid '{sstart}' to '{xo_query_1}'")
# smol_2.filter(f"protein and resid '{xo_subject_1+1}' to '{xo_subject_2}'")
# smol_3.filter(f"protein and resid '{xo_subject_2+1}' to '{send}'")
#
# # Combination query - subject
# chim1 = Molecule()
# chim1.append(qmol_1)
# chim1.append(smol_2)
# chim1.append(qmol_3)
# try:
# self.check_bb_clashes(chim1)
# except BackboneClashError:
# logger.warning(f"comb1_{index1}_{index2} present backbone clashes")
#
#
# # combination subject - query
# chim2 = Molecule()
# chim2.append(smol_1)
# chim2.append(qmol_2)
# chim2.append(smol_3)
# try:
# self.check_bb_clashes(chim1)
# except BackboneClashError:
# logger.warning(f"comb2_{index1}_{index2} present backbone clashes")
#
# return chim1, chim2
#
# def check_bb_clashes(self,chim_mol):
# """
#
# :param chim_mol:
# :return:
# """
# matrix = cdist(chim_mol.coords[:, :, 0], chim_mol.coords[:, :, 0])
# np.fill_diagonal(matrix, np.inf)
# idx1, idx2 = np.where(matrix < 1.3)
# if idx1.any() or idx2.any():
# raise BackboneClashError