Source code for molearn.scoring.dope_score

import numpy as np
from copy import deepcopy

from ..utils import ShutUp, random_string
try:
    import modeller
    from modeller import *
    from modeller.scripts import complete_pdb
    from modeller.optimizers import ConjugateGradients
except Exception as e:
    print('Error importing modeller: ')
    print(e)

from multiprocessing import get_context
import os
    

[docs] class DOPE_Score: ''' This class contains methods to calculate dope without saving to save and load PDB files for every structure. Atoms in a biobox coordinate tensor are mapped to the coordinates in the modeller model directly. ''' atom_map = {('ILE', 'CD1'):('ILE', 'CD')} def __init__(self, mol): ''' :param biobox.Molecule mol: One example frame to gain access to the topology. Mol will also be used to save a temporary pdb file that will be reloaded in modeller to create the initial modeller Model. ''' # set residues names with protonated histidines back to generic HIS name (needed by DOPE score function) testH = mol.data["resname"].values testH[testH == "HIE"] = "HIS" testH[testH == "HID"] = "HIS" _mol = deepcopy(mol) _mol.data["resname"] = testH alternate_residue_names = dict(CSS=('CYX',)) atoms = ' '.join(list(_mol.data['name'].unique())) tmp_file = f'tmp{random_string()}.pdb' _mol.write_pdb(tmp_file, conformations=[0], split_struc=False) log.level(0, 0, 0, 0, 0) env = environ() env.libs.topology.read(file='$(LIB)/top_heav.lib') env.libs.parameters.read(file='$(LIB)/par.lib') self.fast_mdl = complete_pdb(env, tmp_file) self.fast_fs = selection(self.fast_mdl.chains[0]) self.fast_ss = self.fast_fs.only_atom_types(atoms) atom_residue = _mol.get_data(columns=['name', 'resname', 'resid']) atom_order = [] first_index = next(iter(self.fast_ss)).residue.index offset = atom_residue[0, 2]-first_index for i, j in enumerate(self.fast_ss): if i < len(atom_residue): for j_residue_name in alternate_residue_names.get(j.residue.name, (j.residue.name,)): if [j.name, j_residue_name, j.residue.index+offset] == list(atom_residue[i]): atom_order.append(i) else: where_arg = (atom_residue==(np.array([j.name, j_residue_name, j.residue.index+offset], dtype=object))).all(axis=1) where = np.where(where_arg)[0] if len(where)==0: if (j_residue_name, j.name) in self.atom_map: alt_residue_name, alt_name = self.atom_map[(j_residue_name, j.name)] where_arg = (atom_residue==(np.array([alt_name, alt_residue_name, j.residue.index+offset], dtype=object))).all(axis=1) where = np.where(where_arg)[0] else: print(f'Cant find {j.name} in the atoms {atom_residue[atom_residue[:,2]==j.residue.index+offset]} try adding a mapping to DOPE_Score.atom_map') atom_order.append(int(where)) self.fast_atom_order = atom_order # check fast dope atoms reverse_map = {value:key for key, value in self.atom_map.items()} for i, j in enumerate(self.fast_ss): if i<len(atom_residue): assert _mol.data['name'][atom_order[i]]==j.name or reverse_map[(_mol.data['resname'][atom_order[i]], _mol.data['name'][atom_order[i]])][1]==j.name self.cg = ConjugateGradients() os.remove(tmp_file)
[docs] def get_dope(self, frame, refine=False): ''' Get the dope score. Injects coordinates into modeller and uses `mdl.build(build_method='INTERNAL_COORDINATES', initialize_xyz=False)` to reconstruct missing atoms. If a error is thrown by modeller or at any stage, we just return a fixed large value of 1e10. :param numpy.ndarray frame: shape [N, 3] :param bool refine: (default: False) If True, relax the structures using a maximum of 50 steps of ConjugateGradient descent :returns: Dope score as calculated by modeller. If error is thrown we just simply return 1e10. :rtype: float ''' # expect coords to be shape [N, 3] use .cpu().numpy().copy() before passing here and make sure it is scaled correctly try: frame = frame.astype(float) self.fast_fs.unbuild() for i, j in enumerate(self.fast_ss): if i+1<frame.shape[0]: j.x, j.y, j.z = frame[self.fast_atom_order[i], :] self.fast_mdl.build(build_method='INTERNAL_COORDINATES', initialize_xyz=False) if refine == 'both': with ShutUp(): dope_unrefined = self.fast_fs.assess_dope() self.cg.optimize(self.fast_fs, max_iterations=50) dope_refined = self.fast_fs.assess_dope() return dope_unrefined, dope_refined with ShutUp(): if refine: self.cg.optimize(self.fast_fs, max_iterations=50) dope_score = self.fast_fs.assess_dope() return dope_score except Exception: return 1e10
[docs] def get_all_dope(self, coords, refine=False): ''' Expect a array of frames. return array of DOPE score value. :param numpy.ndarray coords: shape [B, N, 3] :param bool refine: (default: False) If True, relax the structures using a maximum of 50 steps of Conjugate Gradient descent :returns: float array shape [B] :rtype: np.ndarray ''' # expect coords to be shape [B, N, 3] use .cpu().numpy().copy() before passing here and make sure it is scaled correctly dope_scores = [] for frame in coords: frame = frame.astype(float) self.fast_fs.unbuild() for i, j in enumerate(self.fast_ss): if i+1<frame.shape[0]: j.x, j.y, j.z = frame[self.fast_atom_order[i], :] self.fast_mdl.build(build_method='INTERNAL_COORDINATES', initialize_xyz=False) if refine: self.cg.optimize(self.fast_fs, max_iterations=50) dope_scores.append(self.fast_fs.assess_dope()) return np.array(dope_scores)
def set_global_score(score, kwargs): ''' Make score a global variable. This is used when initializing a multiprocessing process. ''' global worker_dope_score worker_dope_score = score(**kwargs) # mol = mol, data_dir=data_dir, **kwargs) def process_dope(coords, kwargs): ''' Worker function for multiprocessing class ''' return worker_dope_score.get_dope(coords,**kwargs)
[docs] class Parallel_DOPE_Score: ''' a multiprocessing class to get modeller DOPE scores. A typical use case would looke like:: score_class = Parallel_DOPE_Score(mol, **kwargs) results = [] for frame in coordinates_array: results.append(score_class.get_score(frame)) .... # DOPE will be calculated asynchronously in background #to retrieve the results results = np.array([r.get() for r in results]) ''' def __init__(self, mol, processes=-1, context='spawn', **kwargs): ''' :param biobox.Molecule mol: biobox molecule containing one example frame of the protein to be analysed. This will be passed to DOPE_Score class instances in each thread. :param int processes: (default: -1) Number of processes argument to pass to multiprocessing.pool. This controls the number of threads created. :param \*\*kwargs: additional kwargs will be passed multiprocesing.pool during initialisation. ''' # set a number of processes as user desires, capped on number of CPUs if processes > 0: processes = min(processes, os.cpu_count()) else: processes = os.cpu_count() self.processes = processes self.mol = deepcopy(mol) score = DOPE_Score ctx = get_context(context) self.pool = ctx.Pool(processes=processes, initializer=set_global_score, initargs=(score, dict(mol=mol)), **kwargs) self.process_function = process_dope def __reduce__(self): return (self.__class__, (self.mol, self.processes))
[docs] def get_score(self, coords, **kwargs): ''' :param np.array coords: # shape (N, 3) numpy array ''' # is copy necessary? return self.pool.apply_async(self.process_function, (coords.copy(), kwargs))
def _close(self): if self.pool: self.pool.close() self.pool.join() self.pool = None