abmelt-benchmark / src /order_param.py
ZijianGuan's picture
Upload folder using huggingface_hub
8ef403e verified
#!/usr/bin/env python3
# MIT License
# COPYRIGHT (C) 2024 MERCK SHARP & DOHME CORP. A SUBSIDIARY OF MERCK & CO.,
# INC., RAHWAY, NJ, USA. ALL RIGHTS RESERVED
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import MDAnalysis as mda
import numpy as np
import os
import pandas as pd
from sklearn.linear_model import LinearRegression as reg
import subprocess
from collections import OrderedDict
import freesasa
from MDAnalysis.analysis import rms
from matplotlib import pyplot as plt
import MDAnalysis as mda
from MDAnalysis.analysis import rms, align
import argparse
def get_corr(data):
lin_reg = reg()
x, y = [xy[0] for xy in data] ,[xy[1] for xy in data]
lin_reg.fit(x,y)
return lin_reg.score(x, y)
def no_dash(msa):
ctr = 0
for char in msa:
if char != '-':
ctr +=1
return ctr
def pad_short_lists(dic):
max_len = max([len(val) for val in dic.values()])
for key, val in dic.items():
leng = len(val)
if leng != max_len:
dic[key] = np.array(list(val) + [None]*(max_len - leng))
return dic
def get_lambda(master_dict, temps = [310,350,373]):
#temps = [int(x) for x in temp.split(",")]
residues = master_dict[temps[0]].keys()
lambda_dict = {}
r_dict = {}
for resid in residues:
log_temps = np.array([np.log(temp) for temp in temps]).reshape(-1,1)
log_s = np.array([np.log(1 - np.sqrt(master_dict[temp][resid])) for temp in temps])
lin_reg = reg()
lin_reg.fit(X = log_temps, y = log_s)
lambda_dict[resid], r_dict[resid] = lin_reg.coef_[0], lin_reg.score(log_temps, log_s)
return lambda_dict, r_dict
def get_df(master_s2_dict, lambda_dict, r_dict, temps):
residues = master_s2_dict[temps[0]].keys()
df_dict = {resid:{} for resid in residues}
for resid in residues:
for temp in temps:
df_dict[resid][temp] = master_s2_dict[temp][resid]
if lambda_dict != None and r_dict != None:
df_dict[resid]['lamda'] = lambda_dict[resid]
df_dict[resid]['r'] = r_dict[resid]
return pd.DataFrame(df_dict).T
def get_h_n_coords(u):
coords_dict = OrderedDict()
#we ignore first residue and Prolines because they don't have n-h bond
backbone_residues = u.select_atoms('backbone and not resname PRO').residues[1:]
num_res = backbone_residues.n_residues
for resid in range(num_res):
coords_dict[resid + 1] = backbone_residues[resid].atoms[0:2].positions
return coords_dict
def get_vectors(coords_dict):
vector_dict = {}
for resid, coords in coords_dict.items():
vec = (coords[0,0] - coords[1,0], coords[0,1] - coords[1,1], coords[0,2] - coords[1,2])
norm = np.linalg.norm(vec)
vector_dict[resid] = [comp/norm for comp in vec]
return vector_dict
def multiply_comps(comps):
products = []
for ui in comps:
for uj in comps:
products.append(ui*uj)
return products
def get_s2_df(s2_blocks_dict):
df_dict = {}
for block, s2_dic in s2_blocks_dict.items():
df_dict[block] = sum([s2 for s2 in s2_dic.values()])/len(s2_dic.values())
return pd.DataFrame([df_dict]).T
def get_products(vector_dict):
product_dict = {}
for resid, comps in vector_dict.items():
product_dict[resid] = multiply_comps(comps)
return product_dict
def get_s2(product_dict):
s2_dict = {}
for resid, products in product_dict.items():
s2 = (1.5*sum([product**2 for product in products]) - 0.5)
s2_dict[resid] = 0.89*s2
return s2_dict
def get_range(a,b):
return [i for i in range(int(a),int(b))]
def get_blocks(traj_length, block_length):
block_length *= 100
remainder = traj_length%block_length
num_blocks = int(traj_length // (block_length))
if remainder < 0.5*block_length:
num_blocks = num_blocks - 1
blocks = {block_num: get_range(block_num*block_length, (block_num+1)*block_length) for block_num in range(num_blocks)}
blocks[num_blocks] = get_range(num_blocks*block_length, traj_length)
return blocks
def update_average(dict1, dict2, ts):
new_dict = {}
if ts == -1:
return dict1
else:
for pair1, pair2 in zip(dict1.items(), dict2.items()):
key1, val1 = pair1
key2, val2 = pair2
new_val = []
for prod1, prod2 in zip(val1, val2):
new_val.append((prod1*ts + prod2)/(ts + 1))
new_dict[key1] = new_val
return new_dict
def avg_s2_blocks(s2_dic):
avg_dict = {}
blocks = s2_dic.keys()
resids = s2_dic[0].keys()
for resid in resids:
avg_dict[resid] = sum([s2_dic[block][resid] for block in blocks])/len(blocks)
return avg_dict
def order_s2(mab='mab01', temp='310', block_length=10, start=20, use_dummy=False):
temp = str(temp)
topology = 'md_final_' + temp + '.gro'
trajectory = 'md_final_' + temp + '.xtc'
# Load universe to detect number of residues
u = mda.Universe(topology, trajectory)
protein = u.select_atoms("protein")
# Get number of residues (excluding first residue and prolines, matching get_h_n_coords logic)
backbone_residues = u.select_atoms('backbone and not resname PRO').residues[1:]
num_res = backbone_residues.n_residues
if use_dummy:
print("Generating dummy S2 values for testing...")
print("Number of residues detected: {}".format(num_res))
print("Temperature: {}K".format(temp))
print("Block length: {} ns".format(block_length))
print("-----------------------------------------")
# Generate 2-4 blocks for dummy data regardless of simulation length
# Use block_length to determine number: smaller blocks -> more blocks, larger blocks -> fewer blocks
if block_length <= 2.5:
num_blocks = 4
elif block_length <= 10:
num_blocks = 3
else:
num_blocks = 2
# Set seed based on temperature for reproducibility
np.random.seed(int(temp))
s2_blocks_dict = {}
for block in range(num_blocks):
# Generate realistic S2 values between 0.6-0.9 for each residue
# Add some variation between blocks
base_s2 = 0.75 + np.random.normal(0, 0.05) # Base around 0.75 with small variation
base_s2 = np.clip(base_s2, 0.6, 0.9) # Keep in realistic range
s2_dict = {}
for resid in range(1, num_res + 1):
# Add residue-specific variation
residue_s2 = base_s2 + np.random.normal(0, 0.08)
residue_s2 = np.clip(residue_s2, 0.6, 0.9)
s2_dict[resid] = residue_s2
s2_blocks_dict[block] = s2_dict
print(" block {} dummy order parameter values generated".format(block))
print("saving dummy order parameter values...")
get_s2_df(s2_blocks_dict).to_csv('order_s2_{}K_{}block_{}start.csv'.format(str(temp),str(block_length),str(start)))
print("dummy order parameter values saved.")
print("")
return s2_blocks_dict
# Original computation code
prealigner = align.AlignTraj(u, u, select="protein and name CA", in_memory=True).run()
ref_coordinates = u.trajectory.timeseries(asel=protein).mean(axis=1)
ref = mda.Merge(protein).load_new(ref_coordinates[:, None, :], order="afc")
aligner = align.AlignTraj(u, ref, select="protein and name CA", in_memory=True).run()
traj_length = len(u.trajectory)
print("starting order parameter calculations...")
print("trajectory length: {} ns".format(int(round(traj_length/100, 0))))
print("start: {} ns".format(start))
print("block length: {} ns".format(block_length))
print("-----------------------------------------")
blocks = get_blocks(traj_length - start*100, block_length)
print("{} blocks for {}K temperature".format(len(blocks.keys()), temp))
s2_blocks_dict = {block: None for block in blocks.keys()}
for block, timesteps in blocks.items():
print(" block {} order parameter calculation successful".format(block))
block_product_dict = None
#we will iterate over all timesteps in the block, getting the average vector products acorss the block
for ts in timesteps:
#setting the ts then getting the vector products for the ts
u.trajectory[start + ts]
ts_product_dict = get_products(get_vectors(get_h_n_coords(u)))
#if this is the first ts in the block, the avg product is just the product for the ts
#else we update the average
if block_product_dict != None:
block_product_dict = update_average(block_product_dict, ts_product_dict, ts)
else:
block_product_dict = ts_product_dict
#using the average vector products across the block, we get the s2 value for the block
#and store it in the dictionary
if block_product_dict == None:
continue
s2_blocks_dict[block] = get_s2(block_product_dict)
print("saving order parameter values...")
for block, timesteps in blocks.items():
block_product_dict = None
#we will iterate over all timesteps in the block, getting the average vector products acorss the block
for ts in timesteps:
#setting the ts then getting the vector products for the ts
u.trajectory[start + ts]
ts_product_dict = get_products(get_vectors(get_h_n_coords(u)))
#if this is the first ts in the block, the avg product is just the product for the ts
#else we update the average
if block_product_dict != None:
block_product_dict = update_average(block_product_dict, ts_product_dict, ts)
else:
block_product_dict = ts_product_dict
#using the average vector products across the block, we get the s2 value for the block
#and store it in the dictionary
if block_product_dict == None:
continue
s2_blocks_dict[block] = get_s2(block_product_dict)
get_s2_df(s2_blocks_dict).to_csv('order_s2_{}K_{}block_{}start.csv'.format(str(temp),str(block_length),str(start)))
print("order parameter values saved.")
print("")
return s2_blocks_dict
def order_lambda(master_dict= None, mab='mab01', temps=[310, 350, 373], block_length='10', start='20000'):
temps = [int(x) for x in temps]
#master_s2_dict = {temp:{} for temp in temps}
for temp in temps:
lambda_dict, r_dict = get_lambda(master_dict=master_dict, temps=temps)
df = get_df(master_dict, lambda_dict, r_dict, temps)
df.to_csv('order_lambda_{}block_{}start.csv'.format(block_length, start))