from __future__ import annotations
import os
import mdtraj as md
import numpy as np
from sklearn.cluster import AgglomerativeClustering, KMeans
from sklearn.decomposition import PCA
[docs]
class DataAssembler:
def __init__(
self,
traj_path: str,
topo_path: str | None = None,
test_size: float = 0.15,
n_cluster: int = 1500,
image_mol: bool = False,
outpath: str = "",
verbose: bool = False,
dist_mat: bool = True,
):
"""
Create clustered trajectories, stride trajectories and randomly sampled test frames
and change either the respective trajectory frame indices or create a new trajectory.
Will also concatenate multiple trajectories into one and center protein in the water box.
The topology of the newly created trajectory will be saved in self.outpath/trajectory_name_NEW_TOPO.pdb
:param str traj_path: path to the trajectory
:param str topo_path: path to the topology
:param int test_size: size of the test dataset (0.0 if no test set should be created)
:param int n_cluster: number of clusters to be created (representative frames)
:param bool image_mol: True to image to molecule (center it in the box)
:param str outpath: directory path where the new trajector(y)ies should be stored
:param bool verbose: True to get info which steps are currently performed
:param bool verbose: True to calculate the n_frames x n_frames distance matrix
"""
self.traj_path = traj_path
self.topo_path = topo_path
self.test_size = test_size
self.n_cluster = n_cluster
self.image_mol = image_mol
self.outpath = outpath
self.verbose = verbose
self.dist_mat = dist_mat
assert os.path.exists(self.outpath), "Outpath does not exist"
def _loading_fallback(self, traj_path, topo_path):
"""
try loading trajectories that are not supported by md.load
:param str traj_path: path(s) to the trajector(y)ies
:param str traj_path: path(s) to the topolog(y)ies
"""
ext = os.path.splitext(traj_path)[-1]
match_dict = {
# ".binops": md.load_binpos,
".xml": md.load_xml,
".dcd": md.load_dcd,
".dtr": md.load_dtr,
".nc": md.load_netcdf,
".ncrst": md.load_netcdf,
".trr": md.load_trr,
".xtc": md.load_xtc,
".xyz": md.load_xyz,
".lammpstrj": md.load_lammpstrj,
}
try:
load_func = match_dict[ext]
except KeyError:
raise KeyError(
"Even fallback loading was not able to load the supplied trajectory format"
)
"""
# use that for python versions >=3.10
match ext:
# case ".binops":
# load_func = md.load_binpos
case ".xml":
load_func = md.load_xml
case ".dcd":
load_func = md.load_dcd
case ".dtr":
load_func = md.load_dtr
case ".nc":
load_func = md.load_netcdf
case ".ncrst":
load_func = md.load_netcdf
case ".trr":
load_func = md.load_trr
case ".xtc":
load_func = md.load_xtc
case ".xyz":
load_func = md.load_xyz
case ".lammpstrj":
load_func = md.load_lammpstrj
case _:
raise KeyError(
"Even fallback loading was not able to load the supplied trajectory format"
)
"""
return load_func(traj_path, topo_path)
[docs]
def read_traj(self, atom_indices=None, ref_atom_indices=None) -> None:
"""
Read in one or multiple trajectories, remove everything but protein atoms and
image the molecule to center it in the water box, and create a training/validation
and test split.
:param array_like | None atom_indices: The indices of the atoms to superpose. If not supplied, all atoms will be used.
:param array_like | None ref_atom_indices: Use these atoms on the reference structure. If not supplied, the same atom indices will be used for this trajectory and the reference one.
"""
if self.verbose:
print("Reading trajectory")
self.traj_name = os.path.splitext(
os.path.basename(
self.traj_path if isinstance(self.traj_path, str) else self.traj_path[0]
)
)[0]
# loading the trajectory if it is only one file
if isinstance(self.traj_path, str):
try:
# do not enforce topology file on this formats
fext = os.path.splitext(self.traj_path)[-1]
if any([fext == ".pdb", fext == ".h5", fext == ".lh5"]):
self.traj = md.load(self.traj_path)
else:
self.traj = md.load(self.traj_path, self.topo_path)
except OSError:
self.traj = self._loading_fallback(self.traj_path, self.topo_path)
# select only protein atoms from the trajectory
self.traj = self.traj.atom_slice(self.traj.top.select("protein"))
# loading multiple trajectories
elif isinstance(self.traj_path, list):
if isinstance(self.traj_path, list) and self.topo_path is None:
fext = os.path.splitext(self.traj_path[0])[-1]
# file type doesn't need a topo but zip needs equally long list
if any([fext == ".pdb", fext == ".h5", fext == ".lh5"]):
self.topo_path = [None] * len(self.traj_path)
assert isinstance(
self.topo_path, list
), "If trajectories are supplied as a list the topologies must be too"
assert len(self.traj_path) == len(
self.topo_path
), "there must be as many topologies supplied as trajectories"
multi_traj = []
top0 = None
ucell0 = None
for ct, (trp, top) in enumerate(zip(self.traj_path, self.topo_path)):
if self.verbose:
print(f"\tLoading {os.path.basename(trp)}")
loaded = None
try:
# do not enforce topology file on this formats
trp_ext = os.path.splitext(trp)[-1]
if any([trp_ext == ".pdb", trp_ext == ".h5", trp_ext == ".lh5"]):
loaded = md.load(trp)
else:
loaded = md.load(trp, top)
except OSError:
loaded = self._loading_fallback(trp, top)
# select only protein atoms from the trajectory
loaded = loaded.atom_slice(loaded.top.select("protein"))
# use topology and unit cell from first trajectory to be able to join them
if ct == 0:
top0 = loaded.topology
ucell0 = loaded.unitcell_vectors
else:
if top0.n_atoms != loaded.n_atoms:
raise ValueError(
f"topologies do not match - topology [{ct}] has {loaded.n_atoms} instead of {top0.n_atoms}"
)
loaded.topology = top0
loaded.unitcell_vectors = ucell0
multi_traj.append(loaded)
self.traj = md.join(multi_traj)
# Recenter and apply periodic boundary
if self.image_mol:
try:
if self.verbose:
print("Imaging faild - retrying with supplying anchor molecules")
self.traj.image_molecules(inplace=True)
except ValueError:
try:
self.traj.image_molecules(
inplace=True,
anchor_molecules=[set(self.traj.topology.residue(0).atoms)],
)
except ValueError as e:
print(
f"Unable to image molecule due to '{e}' - will just recenter it"
)
self.traj.superpose(
self.traj[0],
atom_indices=atom_indices,
ref_atom_indices=ref_atom_indices,
)
# maybe not needed after image_molecules
self.traj.center_coordinates()
# converts ELEMENT names from eg "Cd" -> "C" to avoid later complications
topo_table, topo_bonds = self.traj.topology.to_dataframe()
topo_table["element"] = topo_table["element"].apply(
lambda x: x if len(x.strip()) <= 1 else x.strip()[0]
)
if self.verbose:
print("Saving new topology")
self.traj.topology = md.Topology.from_dataframe(topo_table, topo_bonds)
# save new topology
self.traj[0].save_pdb(
os.path.join(self.outpath, f"./{self.traj_name}_NEW_TOPO.pdb")
)
n_frames = self.traj.n_frames
# which index separated indices from training and test dataset
self.test_border = int(n_frames * (1.0 - self.test_size))
# get indices and shuffle them to obtain indices for use as training/validation
# and one as test dataset
self.frame_idx = np.arange(n_frames)
np.random.shuffle(self.frame_idx)
self.train_idx = self.frame_idx[: self.test_border]
train_traj = self.traj[self.train_idx]
# number of frames for training/validation set
n_train_frames = len(self.train_idx)
if self.dist_mat:
# remove H atoms from distance calculation
atom_indices = [
a.index for a in train_traj.topology.atoms if a.element.symbol != "H"
]
if self.verbose:
print("Calculating disance matrix")
# distance matrix between all frames
self.traj_dists = np.empty((n_train_frames, n_train_frames))
for i in range(n_train_frames):
self.traj_dists[i] = md.rmsd(
train_traj, train_traj, i, atom_indices=atom_indices
)
[docs]
def calc_rmsd_f(
self,
) -> tuple[
np.ndarray[tuple[int], np.dtype[np.int_]],
np.ndarray[tuple[int], np.dtype[np.int_]],
np.ndarray[tuple[int], np.dtype[np.int_]],
]:
"""
calculate rmsd and rmsf over the course of the trajectory
:return: tuple[
np.ndarray[tuple[int], np.dtype[np.int_]],
np.ndarray[tuple[int], np.dtype[np.int_]],
np.ndarray[tuple[int], np.dtype[np.int_]],
]
"""
assert hasattr(self, "traj"), "you first need to read in the trajectory"
if self.verbose:
print("Calculating rmsd")
rmsd_analysis = md.rmsd(self.traj, self.traj, 0)
rmsf_analysis = md.rmsf(self.traj, self.traj, 0)
return np.arange(len(rmsd_analysis)), rmsd_analysis, rmsf_analysis
def _find_representatives(self, idx_idx, labels) -> None:
"""
search trough each cluster and find the representative frame
with the highest similarity to all other frames
param: list[int] idx_idx: indices of all frames in the trajectory
param: list[int] labels: cluster label for each frame
"""
if self.verbose:
print("Finding representatives")
self.cluster_idx = []
# iterate over each cluster
for i in np.unique(labels):
# boolean mask for which frame is member of cluster i
label_bool = labels == i
# if cluster has only one memeber
if label_bool.sum() == 1:
iidx = 0
else:
# index the whole distance matrix to get a subdistance matirx
# containing only the distances of cluster members
i_dists = self.traj_dists[np.ix_(label_bool, label_bool)]
# transform distances to similarities and find the frame wit the highest
# similarity to all other cluster members
iidx = np.exp(-1 * i_dists / i_dists.std()).sum(axis=1).argmax()
# add the trajectory frame index of the representative to the list of used frames
self.cluster_idx.append(idx_idx[label_bool][iidx])
[docs]
def distance_cluster(
self,
) -> None:
"""
cluster the trajectory with AgglomerativeClustering based on the rmsd between the frames
"""
assert hasattr(
self, "traj_dists"
), "No pairweise frame distances present - read in trajectory first and make sure `dist_mat=True`"
if self.verbose:
print("Distance clustering")
# replace AgglomerativeClustering with any distance matrix based clustering function
cluster_func = AgglomerativeClustering(
n_clusters=self.n_cluster, metric="precomputed", linkage="average"
)
# cluster the frames with KMeans and find the representative frame for each cluster
cluster_func.fit(self.traj_dists)
idx_idx = np.arange(len(cluster_func.labels_))
self._find_representatives(idx_idx, cluster_func.labels_)
self.cluster_method = "CLUSTER_aggl"
[docs]
def create_dendrogram(self, distance_threshold=50, output_path="dendrogram.png") -> None:
"""
Cluster the trajectory with hierarchical clustering (linkage) based on the RMSD between the frames
and plot a dendrogram.
Group frames that have pairwise distances less than "distance_threshold" in one cluster (default is 50).
"""
assert hasattr(
self, "traj_dists"
), "No pairwise frame distances present - read in trajectory first"
if self.verbose:
print("Hierarchical clustering")
# Perform hierarchical clustering using scipy
self.linkage_matrix = linkage(self.traj_dists, method="ward")
# Plot the dendrogram
plt.figure(figsize=(10, 7))
dendrogram(self.linkage_matrix, no_labels=True, color_threshold=distance_threshold)
plt.title("Dendrogram of Frames")
plt.xlabel("Frame Index")
plt.ylabel("Distance")
plt.savefig('dendrogram.png')
# Define clusters by specifying n_clusters
self.cluster_labels = fcluster(self.linkage_matrix, t=distance_threshold, criterion="distance")
# Store cluster labels and representatives
idx_idx = np.arange(len(self.cluster_labels))
self._find_representatives(idx_idx, self.cluster_labels)
self.cluster_method = "CLUSTER_linkage"
if self.verbose:
print(f"Assigned {self.n_cluster} clusters.")
[docs]
def pca_cluster(self, n: int = 3) -> None:
"""
cluster the trajectory with KMeans based on the first 5 principal components
of a PCA of the trajectory
:param int n: number of principal components (per frame) to use for the clustering
"""
assert hasattr(self, "traj"), "No traj found - read in trajectory first"
if self.verbose:
print("PCA clustering")
train_traj = self.traj[self.train_idx]
# align everything to frame 0 and calculate the
train_traj.superpose(train_traj, 0)
pca = PCA(n_components=n)
reduced_cartesian = pca.fit_transform(
train_traj.xyz.reshape(len(self.train_idx), train_traj.n_atoms * 3)
)
# cluster the frames with KMeans and find the representative frame for each cluster
kmeans = KMeans(n_clusters=self.n_cluster, n_init="auto")
kmeans.fit(reduced_cartesian)
idx_idx = np.arange(len(kmeans.labels_))
self._find_representatives(idx_idx, kmeans.labels_)
self.cluster_method = "CLUSTER_pca"
[docs]
def stride(self) -> None:
"""
reduce the training dataset size to n samples using stride as 'sampling method'
"""
if self.verbose:
print("Reducing size")
n_train_frames = len(self.train_idx)
# indices that create a stride over the remaining training trajectory
stride_idx = np.arange(
0,
n_train_frames,
step=np.floor(n_train_frames / self.n_cluster).astype(int),
)
# train_idx need to be sorted so the strides are again over a normal trajectory
# and not a shuffled one
stride_pre_idx = self.train_idx.copy()
stride_pre_idx.sort()
# true indices of the train_traj
ori_frame_train_idx_stride = stride_pre_idx[stride_idx]
# but we need the indices that index the train_traj indices to be in line with
# clustering methods that return a list of indices for the train_traj indices
_, stride_idx, _ = np.intersect1d(
self.frame_idx, ori_frame_train_idx_stride, return_indices=True
)
# randomly remove frames if stride calculation yielded in to many frames due to np.floor
if len(stride_idx) > self.n_cluster:
stride_idx = np.random.choice(
stride_idx, size=self.n_cluster, replace=False
)
self.cluster_idx = stride_idx
self.cluster_method = f"STRIDE_{self.n_cluster}"
[docs]
def own_idx(self, file_path: str | np.ndarray[tuple[int], np.dtype[np.int64]]):
"""
Provide indices for frames to create a new trajectory.
Useful if trajectory should be sub sampled by some external metric.
:param str | np.ndarray[tuple[int], np.dtype[np.int64]] file_path: path where the file storing the indices is located. Needs to have each index in a separate line. Or can be a numpy array.
"""
if isinstance(file_path, str):
provided_idx = []
with open(file_path, "r") as ifile:
for i in ifile:
provided_idx.append(int(i))
provided_idx = np.asarray(provided_idx)
elif isinstance(file_path, np.ndarray):
provided_idx = file_path
else:
raise ValueError("Provided indices are in an incompatible format")
self.train_idx = provided_idx
self.cluster_idx = np.arange(len(self.train_idx))
self.cluster_method = "PROVIDED"
def _save_idx(
self, filepath: str, idxs: list[int] | np.ndarray[tuple[int], np.dtype[np.int_]]
) -> None:
"""
save frame indices to file (overwrites file if it already exists)
:param str filepath: path where the indices will be stored
:param list[int] idxs: the indices to be stored
"""
if self.verbose:
print("Saving indices")
with open(filepath, "w+") as idx_file:
for i in idxs:
idx_file.write(f"{i}\n")
[docs]
def create_trajectories(
self,
) -> None:
"""
Saves clustered or strided indices for the present training trajectory
and optionally saves them as new trajectory
"""
assert all(
[
hasattr(self, "traj"),
hasattr(self, "traj_name"),
hasattr(self, "train_idx"),
hasattr(self, "cluster_idx"),
]
), "Read in trajectory first"
assert all(
[
hasattr(self, "train_idx"),
hasattr(self, "cluster_idx"),
hasattr(self, "cluster_method"),
]
), "Cluster the trajectory first"
assert hasattr(
self, "cluster_idx"
), "No cluster indices optained by now - use any clustering method or stride to get frames from the trajectory"
ori_frame_train_idx = self.train_idx[self.cluster_idx]
if self.verbose:
print("Creating trajectories")
# create trajectories and the indices of the frames used
self._save_idx(
os.path.join(
self.outpath,
f"./{self.traj_name}_{self.cluster_method}_train_frames.txt",
),
ori_frame_train_idx,
)
self.traj[ori_frame_train_idx].save_dcd(
os.path.join(
self.outpath, f"{self.traj_name}_{self.cluster_method}_train.dcd"
)
)
# only create test trajectory if test size is greater than 0
if self.test_size > 0.0:
self._save_idx(
os.path.join(self.outpath, f"./{self.traj_name}_test_frames.txt"),
self.frame_idx[self.test_border :],
)
self.traj[self.frame_idx[self.test_border :]].save_dcd(
os.path.join(self.outpath, f"./{self.traj_name}_test.dcd")
)
[docs]
def create_trajectories_by_dendrogram(self, test_cluster: int) -> None:
"""
Create test trajectories based on a specific cluster and create the train
trajectories from all the frames excluding the specific cluster.
:param int test_cluster: Cluster to use as the test set.
"""
if self.test_size == 0.0:
raise ValueError("Test set is required to perform this operation.")
if self.cluster_labels is None:
raise ValueError(
"Cluster labels are not initialized. Please run create_dendrogram first."
)
assert all(
[
hasattr(self, "traj"),
hasattr(self, "traj_name"),
hasattr(self, "traj_dists"),
hasattr(self, "train_idx"),
hasattr(self, "frame_idx"),
hasattr(self, "cluster_idx"), ]
), "Ensure trajectory is clustered first"
# Get cluster labels
# cluster_labels = fcluster(linkage(self.traj_dists, method="ward"), t=self.n_cluster, criterion="maxclust")
self.unique_clusters = set(self.cluster_labels)
# Check if the test cluster is in the unique clusters
if test_cluster not in self.unique_clusters:
raise ValueError(f"Cluster {test_cluster} is not in the unique clusters.")
# Separate indices for train and test sets based on the cluster
test_cluster_indices = np.where(self.cluster_labels == test_cluster)[0]
train_cluster_indices = np.where(self.cluster_labels != test_cluster)[0]
if self.verbose:
print(f"Creating train and test trajectories with cluster {test_cluster} being the test set.")
# Save train trajectory
ori_frame_train_idx = self.train_idx[train_cluster_indices]
self._save_idx(
os.path.join(
self.outpath,
f"./{self.traj_name}_train_excluding_cluster_{test_cluster}_frames.txt",
),
ori_frame_train_idx,
)
self.traj[ori_frame_train_idx].save_dcd(
os.path.join(
self.outpath, f"{self.traj_name}_train_excluding_cluster_{test_cluster}.dcd"
)
)
# Save test trajectory
ori_frame_test_idx = self.train_idx[test_cluster_indices]
self._save_idx(
os.path.join(
self.outpath,
f"./{self.traj_name}_test_cluster_{test_cluster}_frames.txt",
),
ori_frame_test_idx,
)
self.traj[ori_frame_test_idx].save_dcd(
os.path.join(
self.outpath, f"{self.traj_name}_test_cluster_{test_cluster}.dcd"
)
)
if self.verbose:
print("Train and test trajectories successfully created.")
if __name__ == "__main__":
pass