import html import re import shutil import tempfile import time import uuid import zipfile from copy import deepcopy from typing import Tuple, List from datetime import datetime import gradio as gr import numpy as np import random import pandas as pd import matplotlib.pyplot as plt from io import BytesIO, StringIO import base64 import json import os import requests from drawRNA import generate_rna_structure # from eval_sequence import CodonUsageAnalyzer from files.codon_table import species_dict, parse_text from inference import inference from model.assemble_fragment import assemble_fragments from model.codon_attr import Codon from model.sliding_windows import process_nucleotide_sequences from model.tools import get_pretraining_args from predict import translate # os.environ["NO_PROXY"] = "hf.space" AA_str = 'ACDEFGHIKLMNPQRSTVWY*-'.lower() AA_TO_CODONS = {"F": ["TTT","TTC"], "L": ["TTA", "TTG", "CTT", "CTC", "CTA", "CTG"], "I": ["ATT", "ATC", "ATA"], "M": ["ATG"], "V": ["GTT", "GTC", "GTA", "GTG"], "S": ["TCT", "TCC", "TCA", "TCG", "AGT", "AGC"], "P": ["CCT", "CCC", "CCA", "CCG"], "T": ["ACT", "ACC", "ACA", "ACG"], "A": ["GCT", "GCC", "GCA", "GCG"], "Y": ["TAT", "TAC"], "H": ["CAT", "CAC"], "Q": ["CAA", "CAG"], "N": ["AAT", "AAC"], "K": ["AAA", "AAG"], "D": ["GAT", "GAC"], "E": ["GAA", "GAG"], "C": ["TGT", "TGC"], "W": ["TGG"], "R": ["CGT", "CGC", "CGA", "CGG", "AGA", "AGG"], "G": ["GGT", "GGC", "GGA", "GGG"], "*": ["TAA", "TAG", "TGA"]} last_ping_time = 0 def smart_wake_up(): global last_ping_time current_time = time.time() # 每 40 分钟(2400秒)触发一次,Space 默认通常是 48 小时或更短时间休眠 if current_time - last_ping_time > 2400: wake_up_space_b() last_ping_time = current_time wake_up_space_b() def wake_up_space_b(): try: # 替换为 Space B 的直接访问链接 url = "https://huggingface.co/spaces/julse/RPcontact" # 使用 head 请求或 get 请求,超时时间设短一点以免影响 A 的速度 requests.get(url, timeout=5) print("Successfully pinged Space B") except Exception as e: print(f"Failed to wake up Space B: {e}") def reverse_dictionary(dictionary): """Return dict of {value: key, ->} Input: dictionary: dict of {key: [value, ->], ->} Output: reverse_dictionary: dict of {value: key, ->} """ reverse_dictionary = {} for key, values in dictionary.items(): for value in values: reverse_dictionary[value] = key return reverse_dictionary CODON_TO_AA = reverse_dictionary(AA_TO_CODONS) # 模拟数据 - 实际使用时需要替换为真实数据 species_data = { "Homo_sapiens": {"codon_table": {}, "trna": {}, "codon_usage": {}}, "Mus_musculus": {"codon_table": {}, "trna": {}, "codon_usage": {}}, "Pichia": {"codon_table": {}, "trna": {}, "codon_usage": {}}, "Escherichia_coli": {"codon_table": {}, "trna": {}, "codon_usage": {}}, "Saccharomyces_cerevisiae": {"codon_table": {}, "trna": {}, "codon_usage": {}}, } # ['Homo_sapiens', 'Mus_musculus', 'Pichia', 'Escherichia_coli','Saccharomyces_cerevisiae'] # 示例数据 # # EGFP EXAMPLE_CDS = "Atggtgagcaagggcgaggagctgttcaccggggtggtgcccatcctggtcgagctggacggcgacgtaaacggccacaagttcagcgtgtccggcgagggcgagggcgatgccacctacggcaagctgaccctgaagttcatctgcaccaccggcaagctgcccgtgccctggcccaccctcgtgaccaccctgacctacggcgtgcagtgcttcagccgctaccccgaccacatgaagcagcacgacttcttcaagtccgccatgcccgaaggctacgtccaggagcgcaccatcttcttcaaggacgacggcaactacaagacccgcgccgaggtgaagttcgagggcgacaccctggtgaaccgcatcgagctgaagggcatcgacttcaaggaggacggcaacatcctggggcacaagctggagtacaactacaacagccacaacgtctatatcatggccgacaagcagaagaacggcatcaaggtgaacttcaagatccgccacaacatcgaggacggcagcgtgcagctcgccgaccactaccagcagaacacccccatcggcgacggccccgtgctgctgcccgacaaccactacctgagcacccagtccgccctgagcaaagaccccaacgagaagcgcgatcacatggtcctgctggagttcgtgaccgccgccgggatcactctcggcatggacgagctgtacaagtaa".upper().replace('T', 'U') # hHBB # EXAMPLE_UTR5 = "GAAAAGAGCCCCGGAAAGGAUCUAUCCCUUCCUGUUCUGCUGCACGCAAAAGAACAGCCAAGGGGGAGGCCACC" paper's example: sequence from https://fpegn0vwti.feishu.cn/docx/A7PvdTiqmohpxMxmGEecVMvNnbf#share-SPeTdcingohMH1xN7vHcYW1tnmg EXAMPLE_UTR5 = "AGAACATTTGCTTCTGACACAACTGTGTTCACTAGCAACCTCAAACAGACACC" # zhangqiong's example AGA+hHBB, MK476347.1 # hHBB EXAMPLE_UTR3 = "GCUCGCUUUCUUGCUGUCCAAUUUCUAUUAAAGGUUCCUUUGUUCCCUAAGUCCAACUACUAAACUGGGGGAUAUUAUGAAGGGCCUUGAGCAUCUGGAUUCUGCCUAAUAAAAAACAUUUAUUUUCAUUGCAA" EXAMPLE_MRNA = EXAMPLE_UTR5 + EXAMPLE_CDS + EXAMPLE_UTR3 EXAMPLE_PROTEIN = 'MVSKGEELFTGVVPILVELDGDVNGHKFSVSGEGEGDATYGKLTLKFICTTGKLPVPWPTLVTTLTYGVQCFSRYPDHMKQHDFFKSAMPEGYVQERTIFFKDDGNYKTRAEVKFEGDTLVNRIELKGIDFKEDGNILGHKLEYNYNSHNVYIMADKQKNGIKVNFKIRHNIEDGSVQLADHYQQNTPIGDGPVLLPDNHYLSTQSALSKDPNEKRDHMVLLEFVTAAGITLGMDELYK*' # GFP EXAMPLE_PROTEIN = 'MSRLPVLLLLQLLVRPGLQAPMTQTTPLKTSWVNCSNMIDEIITHLKQPPLPLLDFNNLNGEDQDILMENNLRRPNLEAFNRAVKSLQNASAIESILKNLLPCLPLATAAPTRHPIHIKDGDWNEFRRKLTFYLKTLENAQAQQTTLSLAIF*' # IL3 def check_and_plot_metrics(tmp_df, method="AA2CDS"): """ 检查数据并自动绘制五个指标的柱状图 """ # 确保所有需要的列都存在 required_cols = ['GC', 'GC_head', 'CAI', 'CAI_head', 'ENC'] # 创建只包含所需列的DataFrame display_df = pd.DataFrame() for col in required_cols: if col in tmp_df.columns: display_df[col] = pd.to_numeric(tmp_df[col], errors='coerce') else: print(f"Warning: Column '{col}' not found, using placeholder zeros.") display_df[col] = [0] * len(tmp_df) # 检查是否有有效数据 valid_data = display_df.notna().any().any() if not valid_data: print("Error: No valid numeric data found in the specified columns!") return None, None # 绘制图形 print(f"Plotting optimization metrics for {len(tmp_df)} sequences...") fig, axes = plot_optimization_metrics(display_df, method=method) return fig, axes def generate_seeds_for_variants(num_variants, current_seed_text=""): """ 根据变体数量生成随机seed 参数: num_variants: 需要生成的变体数量 current_seed_text: 当前seed文本框的内容(可选) 返回: str: 逗号分隔的seed字符串 """ # 如果用户已经有自定义的seed,尝试使用它 if current_seed_text and current_seed_text.strip(): try: # 解析现有的seed seeds = [int(s.strip()) for s in current_seed_text.split(',') if s.strip()] if len(seeds) >= num_variants: # 已有足够seed,只取前num_variants个 return ", ".join(map(str, seeds[:num_variants])) else: # 需要补充seed,基于最后一个seed递增 last_seed = seeds[-1] if seeds else random.randint(1000, 9999) additional_needed = num_variants - len(seeds) for i in range(additional_needed): seeds.append(last_seed + i + 1) return ", ".join(map(str, seeds)) except ValueError: # 如果seed格式无效,生成全新的随机seed pass # 生成全新的随机seed seeds = random.sample(range(1000, 10000), num_variants) return ", ".join(map(str, seeds)) def plot_optimization_metrics(display_df, method="AA2CDS", figsize=(8, 6)): """ 专门绘制GC, GC_head, CAI, CAI_head, ENC五个指标的柱状图 参数: display_df: 必须包含 'GC', 'GC_head', 'CAI', 'CAI_head', 'ENC' 列的DataFrame method: 方法名称,用于标题 figsize: 图形大小 """ # 定义要绘制的4个指标及其显示名称 metrics = ['CAI', 'CAI_head','GC', 'GC_head'] metric_titles = { 'GC': 'GC Content', 'GC_head': "5' GC Content", 'CAI': 'CAI', 'CAI_head': "5' CAI", } # 验证数据列是否存在 missing_cols = [col for col in metrics if col not in display_df.columns] if missing_cols: raise ValueError(f"DataFrame missing required columns: {missing_cols}") # 创建2x2的子图布局(最后一个位置留空) fig, axes = plt.subplots(4,1, figsize=figsize) axes = axes.flatten() # 展平为1D数组 # 设置全局样式 plt.rcParams.update({ 'font.size': 12, 'axes.titlesize': 13, 'axes.labelsize': 12, 'xtick.labelsize': 10, 'ytick.labelsize': 10, }) # 为每个指标创建柱状图 for idx, metric in enumerate(metrics): ax = axes[idx] # 获取数据并确保为数值类型 scores = pd.to_numeric(display_df[metric], errors='coerce').tolist() n_sequences = len(scores) if n_sequences == 0: ax.text(0.5, 0.5, f'No data for {metric}', ha='center', va='center', transform=ax.transAxes) continue colors = ['lightblue' if method == 'AA2CDS' else 'gray' for method in display_df['Method']] # 添加图例(重要!) # from matplotlib.patches import Patch # legend_elements = [Patch(facecolor='lightblue', label='Our method (AA2CDS)'), # Patch(facecolor='gray', label='Other methods (GC)')] # ax.legend(handles=legend_elements, loc='upper right') # # 创建柱状图 - 使用彩虹色系 # colors = plt.cm.rainbow(np.linspace(0, 1, n_sequences)) if '_id' in display_df.columns and len(display_df)<20: x_positions = display_df['Name'].to_list() # x_positions = display_df['_id'].to_list() # x_positions = display_df['_id']#+'('+display_df['Name']+')' else: x_positions = np.arange(1,n_sequences+1) # bars = ax.bar(x_positions, scores, color=colors, alpha=0.8, # edgecolor='black', linewidth=0.5) # 绘制柱状图 # fig, ax = plt.subplots(figsize=(12, 6)) bars = ax.bar(x_positions, scores, color=colors, alpha=0.8, edgecolor='black', linewidth=0.5) # 设置标题和标签 ax.set_ylabel('score', fontweight='bold') ax.set_xticklabels('') # 根据指标类型设置y轴范围 # if metric in ['GC', 'GC_head', 'CAI', 'CAI_head']: # ax.set_ylim([0, 1.0]) # ax.set_yticks(np.arange(0, 1.1, 0.2)) # ax.axhline(y=0.5, color='gray', linestyle=':', alpha=0.5) # elif metric == 'ENC': # # ENC的理论范围是20-61 # ax.set_ylim([20, 61]) # ax.set_yticks(np.arange(20, 62, 10)) #添加平均值和标准差线 mean_val = np.mean(scores) std_val = np.std(scores) # 均值线 ax.axhline(y=mean_val, color='red', linestyle='-', linewidth=2, alpha=0.8, label=f'Mean: {mean_val:.3f}') # 标准差带(均值±标准差) ax.axhline(y=mean_val + std_val, color='orange', linestyle='--', linewidth=1.5, alpha=0.6, label=f'±{std_val:.3f} SD') ax.axhline(y=mean_val - std_val, color='orange', linestyle='--', linewidth=1.5, alpha=0.6) # 填充标准差区域 ax.fill_between([-0.5, n_sequences - 0.5], mean_val - std_val, mean_val + std_val, color='orange', alpha=0.1) # 添加统计信息文本框 # stats_text = f'n = {n_sequences}\nμ = {mean_val:.3f}\nσ = {std_val:.3f}' # stats_text = f'mean ± std{mean_val:.3f} ± {std_val:.3f}' # ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, # fontsize=10, verticalalignment='top', # bbox=dict(boxstyle='round', facecolor='white', alpha=0.8)) # 添加图例 到外侧 ax.legend(loc='upper left', bbox_to_anchor=(1.02, 1), fontsize=9,title=metric) # 优化x轴标签(如果序列太多,减少显示) # if n_sequences > 15: # step = max(1, n_sequences // 10) # 最多显示10个标签 # xticks = np.arange(0, n_sequences, step) # # xticks = display_df.index.tolist() # ax.set_xticks(xticks) # ax.set_xticklabels([str(int(i)) for i in xticks]) # 添加网格 ax.grid(True, alpha=0.2, linestyle='-', axis='y') ax.set_title(f"{metric_titles[metric]}", fontweight='bold') ax = axes[-1] ax.set_xlabel("Sequence Name", fontweight='bold') ax.set_xticklabels(display_df['_id'].tolist(), rotation=45, ha='right') # 隐藏最后一个(第6个)子图 # axes[-1].set_visible(False) # # 添加整体标题 # fig.suptitle(f'CDS Optimization Metrics - {method}\n\n', # fontsize=16, fontweight='bold', y=1.02) # 调整布局 plt.tight_layout() # 显示详细统计信息 print(f"\n{'=' * 60}") print(f"DETAILED STATISTICS") print(f"{'=' * 60}") for metric in metrics: if metric in display_df.columns: values = pd.to_numeric(display_df[metric], errors='coerce').dropna() if len(values) > 0: print(f"\n{metric_titles[metric]}:") print(f" Count: {len(values):>4}") print(f" Mean: {values.mean():>8.4f}") print(f" Std Dev: {values.std():>8.4f}") print(f" Min: {values.min():>8.4f}") print(f" 25%: {values.quantile(0.25):>8.4f}") print(f" 50%: {values.median():>8.4f}") print(f" 75%: {values.quantile(0.75):>8.4f}") print(f" Max: {values.max():>8.4f}") return fig, axes # 使用示例 # display_df = tmp_df[['GC', 'GC_head', 'CAI', 'CAI_head', 'ENC']] # 只选需要的列 # fig, axes = plot_optimization_metrics(display_df, method="AA2CDS") # plt.savefig('optimization_metrics.png', dpi=300, bbox_inches='tight') # plt.show() def seq_formatted(seq, width=70, block=10): seq = re.sub(r'[^A-Za-z*]', '', seq).upper() total = len(seq) max_no = ((total - 1) // width) + 1 num_w = len(str(max_no)) + 2 lines = [] for i in range(0, len(seq), width): row = seq[i:i+width] # 每 block 个字符一块,不足右补空格 blocks = [row[j:j+block].ljust(block) for j in range(0, len(row), block)] line = ' '.join(blocks) line_number = f"{i+1:<{num_w}}" lines.append(f"{line_number} {line}") return '\n'.join(lines) def fasta_format_block(seq, width=70, block=10): lines = seq.split('\n') formatted_lines = [] for line in lines : if '>' in line: formatted_lines.append(line) else: formatted_lines.append(seq_formatted(line, width, block)) return '\n'.join(formatted_lines) def find_longest_cds(seq: str): """ 在mRNA序列中查找最长的CDS区域 参数: seq: mRNA序列 返回: (start, end): CDS区域的起始和结束索引 """ seq = seq.upper().replace('U', 'T') best_start = -1 best_end = -1 max_length = 0 # 尝试所有可能的阅读框 cds_collect = [] for frame in range(3): in_orf = False current_start = -1 for pos in range(frame, len(seq) - 2, 3): codon = seq[pos:pos + 3] # 如果是起始密码子 if codon == "ATG" and not in_orf: in_orf = True current_start = pos # 如果是终止密码子 elif in_orf and codon in ["TAA", "TAG", "TGA"]: orf_length = pos - current_start cds_collect.append((current_start, pos + 3,pos+3-current_start)) if orf_length > max_length: max_length = orf_length best_start = current_start best_end = pos + 3 in_orf = False # 处理没有终止密码子的情况 if in_orf: orf_length = len(seq) - current_start if orf_length > max_length: max_length = orf_length best_start = current_start best_end = len(seq) return best_start, best_end,cds_collect def gc_biased_sampling_cds( protein_seq, codon_usage_df, target_gc=0.55, strength=5.0, seed=None ): if seed is not None: random.seed(seed) np.random.seed(int(seed)) aa2codons = {} for _, row in codon_usage_df.iterrows(): gc = (row["triplet"].count("G") + row["triplet"].count("C")) / 3 aa2codons.setdefault(row["amino_acid"], []).append( (row["triplet"], row["fraction"], gc) ) cds = [] for aa in protein_seq: if aa == "*": cds.append("TAA") continue codons, fracs, gcs = zip(*aa2codons[aa]) gcs = np.array(gcs) fracs = np.array(fracs) bias = np.exp(-strength * np.abs(gcs - target_gc)) probs = fracs * bias probs /= probs.sum() cds.append(np.random.choice(codons, p=probs)) return "".join(cds) def cai_sampling_cds(protein_seq, codon_usage_df, seed=None): if seed is not None: random.seed(seed) np.random.seed(int(seed)) aa2codons = {} for _, row in codon_usage_df.iterrows(): aa2codons.setdefault(row["amino_acid"], []).append( (row["triplet"], row["fraction"]) ) cds = [] for aa in protein_seq: if aa == "*": cds.append("TAA") continue codons, weights = zip(*aa2codons[aa]) cds.append(random.choices(codons, weights=weights, k=1)[0]) return "".join(cds) def analysis_sequence(analyzer,seq,_id='_id',head=60,species='species'): result = { '_id': _id,#f'seed_{seed}', 'GC': round((seq.count("G") + seq.count("C")) / len(seq), 4), 'GC_head': round((seq[:head].count("G") + seq[:head].count("C")) / len(seq[:head]), 4), 'CAI': round(analyzer.calculate_CAI(seq), 4), 'CAI_head': round(analyzer.calculate_CAI(seq[:head]), 4), # 'ENC': round(analyzer.calculate_ENC(seq), 4), 'CDS_Full': seq, 'CDS': seq[:head] + "..." if len(seq) > head else seq, 'species': species, } return result def optimize_cds(protein_seq, species, codon_usage_table, method, status_msg,optimize_seed): """ 2. AA2CDS variants (seeds) 3. CAI extreme (CAI_max) 4. CAI sampling variants 5. GC extreme (GC_max / GC_min) 6. GC sampling variants """ head = 30 tmps = [] # seeds = ['1337', '42', '2022', '2023', '2024', '2025'] seeds = [x.strip() for x in optimize_seed.split(',')] def log(msg): return status_msg + f"\n{msg}" if not protein_seq: status_msg = log("❌ Error: Please enter a protein sequence") return None,None, None, status_msg status_msg = log(f'{protein_seq}') status_msg = log("🔹 Step 1/5: Parsing protein sequence") protein_seq = parse_formated_input(protein_seq) status_msg = log(f" • Protein length: {len(protein_seq)} aa") # 校验蛋白序列 protein_seq = protein_seq.upper() if not re.match(r'^[ACDEFGHIKLMNPQRSTVWY*]+$', protein_seq): return None, None, None, log( f"❌ Error: Invalid protein sequence {set(list(protein_seq)) | set(list('ACDEFGHIKLMNPQRSTVWY*'))}") dirout = tempfile.mkdtemp(prefix="aa2cds_") # 默认就是 /tmp dirout = dirout +'/' # timestamp = datetime.now().strftime("%Y%m%d%H%M%S") # dirout = f'tmp/{timestamp}_{species}/' # os.makedirs(dirout, exist_ok=True) task = 'predict_web' os.makedirs(f'{dirout}/{task}/', exist_ok=True) _id = 'AA2CDS' codon_usage_path = f'{dirout}/codon_usage.csv' codon_usage_table.to_csv(codon_usage_path, index=False) codon_usage_table['fraction'] = codon_usage_table['fraction'].astype(float) try: status_msg = log(f"🔹 Step 2/5: Initial CAI-optimal CDS generation") df = pd.DataFrame({'id': [_id], 'RefSeq_aa': [protein_seq]}) df.to_csv(dirout + f'{task}/input.csv', index=False) reverse_mapping = { "Mus_musculus": "mouse", "Escherichia_coli": "Ec", "Saccharomyces_cerevisiae": "Sac", "Pichia": "Pic", "Homo_sapiens": "Human" } species = reverse_mapping[species] df['species'] = species codon_instance = {species: Codon(codon_usage_path, rna=False)} analyzer = codon_instance[species] status_msg = log("🔹 Step 4/5: Multi-seed neural optimization") status_msg = log(f" • Seeds: {', '.join(seeds)}") ''''CAI''' df['cai_best_nn'] = df.apply( lambda x: codon_instance[x['species']].cai_opt_codon(x['RefSeq_aa']), axis=1 ) seq = df['cai_best_nn'].iloc[0] result = analysis_sequence(analyzer, seq, _id=f'CAI_max', head=head, species=species) result['Method'] = 'CAI' result['Seed'] = '-' result['Variant'] = 'CAI:Max' tmps.append(pd.DataFrame({k: [v] for k, v in result.items()})) '''sampling by codon CAI''' weights_df = deepcopy(codon_usage_table[['triplet', 'amino_acid', 'fraction']]) weights_df['amino_acid'] = weights_df['amino_acid'].str.lower() if 'CAI' in method: for seed in seeds: random.seed(seed) np.random.seed(int(seed)) seq = codon_instance[species].random_codon_weight(protein_seq,weights_df=weights_df) result = analysis_sequence(analyzer, seq, _id=f'CAI_seed_{seed}', head=head, species=species) result['Method'] = 'CAI' result['Seed'] = str(seed) result['Variant'] = 'Codon-usage sampling' # Control # CAI: sample (seed=42) tmps.append(pd.DataFrame({k: [v] for k, v in result.items()})) '''sampling by codon GC*usage''' if 'GC' in method: # weights_df['GC'] = weights_df.apply(lambda x:gc_content(x['triplet'])*x['fraction'],axis=1) weights_df['GC'] = weights_df.apply(lambda x:gc_content(x['triplet']),axis=1) weights_df['GC_nega'] = -weights_df['GC'] weights_df = weights_df.sort_values(by=['GC','fraction'],ascending=False) seq = analyzer.random_codon_weight(protein_seq, weights_df=weights_df.drop_duplicates(subset='amino_acid',keep='first')[['triplet', 'amino_acid', 'fraction']]) result = analysis_sequence(analyzer, seq, _id=f'GC_max', head=head, species=species) result['Method'] = 'GC' result['Seed'] = '-' result['Variant'] = 'GC:Max' tmps.append(pd.DataFrame({k: [v] for k, v in result.items()})) weights_df = weights_df.sort_values(by=['GC_nega','fraction'],ascending=False) seq = analyzer.random_codon_weight(protein_seq, weights_df=weights_df.drop_duplicates(subset='amino_acid',keep='first')[['triplet', 'amino_acid', 'fraction']]) result = analysis_sequence(analyzer, seq, _id=f'GC_min', head=head, species=species) result['Method'] = 'GC' result['Seed'] = '-' result['Variant'] = 'GC:Min' tmps.append(pd.DataFrame({k: [v] for k, v in result.items()})) # target_gc = 0.6 # weights_df['GC_distance'] = abs(weights_df['GC'] - target_gc) # weights_df['GC_score'] = np.exp(-weights_df['GC_distance'] / 0.2) # 指数衰减 weights_df = weights_df[['triplet', 'amino_acid', 'GC']].copy() weights_df.columns = ['triplet', 'amino_acid', 'fraction'] for seed in seeds: random.seed(seed) np.random.seed(int(seed)) seq = analyzer.random_codon_weight(protein_seq,weights_df=weights_df) result = analysis_sequence(analyzer, seq, _id=f'GC_seed_{seed}', head=head, species=species) result['Method'] = 'GC' result['Seed'] = seed result['Variant'] = 'GC content sampling' tmps.append(pd.DataFrame({k: [v] for k, v in result.items()})) if 'AA2CDS' in method: status_msg = log("🔹 Step 3/5: Fragmentation & translation consistency check") fragments_list = df.apply( lambda x: process_nucleotide_sequences( x['cai_best_nn'], max_nn_length=1200, step=300, pad_char='_', meta_dict={'_id': x['id'], 'species': x['species']} ), axis=1 ) expanded_data = pd.DataFrame([item for sublist in fragments_list for item in sublist]) expanded_data['truncated_aa'] = expanded_data['truncated_nn'].apply(translate) expanded_data = expanded_data.rename(columns={'truncated_nn': 'cai_best_nn'}) expanded_data.to_csv(dirout + f'{task}/TS.csv', index=False) parser = get_pretraining_args() args = parser.parse_args() args.downstream_data_path = dirout args.task = task args.predict = True args.mlm_pretrained_model_path = 'checkpoint/AA2CDS.pth' df_trun = pd.read_csv(dirout + f'{task}/TS.csv') for i,seed in enumerate(seeds): random.seed(seed) np.random.seed(int(seed)) status_msg = log(f" ⏳ Running inference (seed={seed})") args.seed = seed args.out_dir = f'{dirout}/{seed}' os.makedirs(args.out_dir, exist_ok=True) inference(args) fpred = f'{args.out_dir}/{task}/TS_pred.csv' os.system(f'cat {fpred}') df_pred = pd.read_csv(fpred) df_info = df_pred.merge(df_trun) print(len(df_info), df_info.columns) seq = assemble_fragments(df_info) result = analysis_sequence(analyzer, seq, _id=f'{_id}_seed_{seed}', head=head, species=species) result['Method'] = 'AA2CDS' result['Seed'] = seed result['Variant'] = 'Primary design' tmps.insert(i, pd.DataFrame({k: [v] for k, v in result.items()})) tmp_df = pd.concat(tmps, ignore_index=True) # tmp_df = tmp_df.sort_values(by='CAI', ascending=False) # Reset index without implying any ranking tmp_df.reset_index(inplace=True, drop=True) # Assign technical sequence IDs based on generation order (not ranking) tmp_df['Name'] = [f"s_{i + 1:02d}" for i in range(len(tmp_df))] tmp_df.to_csv(f'{dirout}/results.csv', index=False) status_msg = log("🔹 Step 5/5: Ranking & visualization") display_df = tmp_df[['Name','Method','Variant','Seed', 'CAI', 'CAI_head','GC', 'GC_head', 'CDS_Full']] display_df = display_df.rename(columns={'CDS_full':'CDS'}) # 使用示例 fig_df = tmp_df#[['_id',"Name",'CAI', 'CAI_head','GC', 'GC_head','Method']] # 只选需要的列 fig, axes = plot_optimization_metrics(fig_df, method=method) plt.savefig(f'{dirout}/optimization_metrics.png', dpi=300, bbox_inches='tight') plt.show() # fig, ax = plt.subplots(figsize=(10, 6)) # scores = display_df["GC"].astype(float).tolist() # bars = ax.bar(range(1, len(scores) + 1), scores, alpha=0.7) # ax.set_xlabel("Sequence Rank") # ax.set_ylabel("GC Content") # ax.set_title(f"CDS Optimization Results ({method})") # ax.grid(True, alpha=0.3) # for i in range(min(5, len(bars))): # bars[i].set_color('orange') status_msg = log(f"✅ Successfully generated {len(display_df)} optimized CDS sequences") status_msg = log("🎉 Optimization complete") except Exception as e: status_msg = log(f"❌ Error: {e}") return None, None,None, status_msg # src_dir = "/app/tmp/20251220153157_Mus_musculus/42/predict_web" zip_base = "predict_web_results" # 不要加 .zip zip_path = shutil.make_archive( base_name=zip_base, format="zip", root_dir=dirout ) return display_df, fig,zip_path,status_msg def download_cds_results(results_df): if results_df is None or len(results_df) == 0: return None # 重新添加完整序列用于下载 # 保存为CSV csv_buffer = StringIO() results_df.to_csv(csv_buffer, index=False) csv_content = csv_buffer.getvalue() # 创建临时文件 filename = "cds_optimization_results.csv" with open(filename, 'w') as f: f.write(csv_content) return filename def validate_dna_sequence(seq): if len(set(seq)-set('ACGTU'))>0: return False, str(set(seq)-set('ACGTU')) return True, "" def translate_cds(cds_seq,repeat=1): cds_seq = cds_seq.upper().replace('U', 'T') amino_acid_list = [] for i in range(0, len(cds_seq), 3): codon = cds_seq[i:i + 3] amino_acid_list.append(CODON_TO_AA.get(codon, '-') * repeat) amino_acid_seq = ''.join(amino_acid_list) return amino_acid_seq def parse_formated_input(formated_input): seq = re.sub(r'[^A-Za-z.()<>\[\]{}*]', '', formated_input).upper() return seq def parse_seq_input(mrna_input): lines = mrna_input.split('\n') if '>' in lines[0]: _id = lines[0].strip().split()[0] seq_dbn = ''.join(lines[1:]) else: _id = '' seq_dbn = ''.join(lines) seq = re.sub(r'[^A-Za-z]', '', seq_dbn).upper().replace('T', 'U') # 只保留碱基 structure = re.sub(r'[^.()<>\[\]{}]', '', seq_dbn) # 只保留结构符号 return _id, seq, structure def package_structure_translation_results( annotation_html, mrna_fasta, protein_translation, cds_table, image_paths=None, start=None, stop=None, ): """ Package AA2CDS Structure & Translation module results into a ZIP archive. Returns the path to the generated zip file. """ # 1️⃣ 创建临时工作目录 workdir = tempfile.mkdtemp(prefix="aa2cds_export") root_dir = os.path.join(workdir, "AA2CDS_Structure_Translation") os.makedirs(root_dir, exist_ok=True) # 2️⃣ README readme_text = f"""AA2CDS – Structure & Translation Module This archive contains results generated by the AA2CDS Structure & Translation module. It is intended for verification of CDS boundaries, mRNA structural context, and translation consistency. CDS region: start = {start} stop = {stop} Notes: - Metrics and visualizations are provided for descriptive purposes only. - This module does not predict protein expression levels. - Original user input sequences are not stored. """ with open(os.path.join(root_dir, "README.txt"), "w") as f: f.write(readme_text) # 3️⃣ HTML annotation if annotation_html: with open(os.path.join(root_dir, "mRNA_annotation.html"), "w") as f: f.write(annotation_html) # 4️⃣ FASTA if mrna_fasta: with open(os.path.join(root_dir, "mRNA_sequence.fasta"), "w") as f: f.write(mrna_fasta) # 5️⃣ Protein translation if protein_translation: with open(os.path.join(root_dir, "protein_translation.txt"), "w") as f: f.write( "Protein sequence translated from the selected CDS region:\n\n" + protein_translation ) # 6️⃣ CDS candidates table if cds_table is not None: cds_csv_path = os.path.join(root_dir, "orf_position.csv") cds_table.to_csv(cds_csv_path, index=False) # 7️⃣ Secondary structure files if image_paths: struct_dir = os.path.join(root_dir, "secondary_structure") os.makedirs(struct_dir, exist_ok=True) for p,_ in image_paths: # None值是因为选项不匹配 if p and os.path.exists(p): shutil.copy(p, struct_dir) # 8️⃣ 压缩 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_path = os.path.join( tempfile.gettempdir(), f"AA2CDS_Structure_Translation_{timestamp}.zip" ) with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: for foldername, _, filenames in os.walk(root_dir): for filename in filenames: file_path = os.path.join(foldername, filename) arcname = os.path.relpath(file_path, workdir) zipf.write(file_path, arcname) return zip_path def annotate_sequence(mrna_input, start=-1, end=-1,utr5="",cds_seq="",utr3="", draw_2d="",cds_posi_collect=None): status_msg = '' smart_wake_up() def log(msg): return status_msg + f"\n{msg}" if start =='' or end =='': start = -1 end = -1 start = int(start) end = int(end) # dirout = tempfile.mkdtemp(prefix="aa2cds_rna_outputs") # 默认就是 /tmp if utr5 and cds_seq and utr3: utr5 = parse_seq_input(utr5)[1] cds_seq = parse_seq_input(cds_seq)[1] utr3 = parse_seq_input(utr3)[1] seq = utr5 + cds_seq + utr3 structure = "" start = len(utr5) end = len(seq)-len(utr3) status_msg = log(f"✅ Backbone successfully assembled from custom UTR/CDS segments.") elif mrna_input: _id, seq, structure = parse_seq_input(mrna_input) status_msg = log(f"✅ Backbone incomplete; falling back to input mRNA sequence: {_id}") else: return "
Please enter a sequence or UTRs and CDS
", None, -1, -1,None,None,None,None,None,None, status_msg if not validate_dna_sequence(seq)[0]: return "
Invalid sequence. Only A, C, G, T/U allowed.
", None,-1, -1,None,None,None, None, None,None,status_msg if start == -1 and end == -1: start, end,cds_posi_collect = find_longest_cds(seq) utr5 = seq[:start] utr3 = seq[end:] cds_posi_collect = pd.DataFrame(cds_posi_collect, columns=['start', 'end','orf_length']) cds_posi_collect = cds_posi_collect.sort_values(by='orf_length', ascending=False) # if start == -1 and end == -1: # status_msg = log( # f"❌ Not found CDS in sequence. Please check the input sequence or specify the CDS position manually.") # return "
No CDS found in sequence
", None, -1, -1, None,None,None,None,None,None,status_msg status_msg = log(f"✅ Found longest CDS at position {start} to {end}, totally found {len(cds_posi_collect)} CDS fragments") else: if start == -1 or end ==-1: return "
No CDS found in sequence
", None, -1, -1, None,None,None,None,None,None,status_msg status_msg = log(f"✅ Using user-defined CDS at position {start} to {end}") utr5 = seq[:start] utr3 = seq[end:] # 提取CDS序列 cds_seq = seq[start:end] # 翻译CDS为氨基酸序列 aa_seq = translate_cds(cds_seq) # 创建带颜色的HTML结果 html_result = "
" frame_lenth = 60 # CDS and proten cds_formatted = '\n'.join([cds_seq[i:i + frame_lenth] for i in range(0, len(cds_seq), frame_lenth)]) aa_formatted = '\n'.join([aa_seq[i:i + frame_lenth] for i in range(0, len(aa_seq), frame_lenth)]) html_result += f"{frame_lenth} nt per line\n\nCDS ({len(cds_seq)} bp):\n{cds_formatted}\n\n" html_result += f"Protein ({len(aa_seq)} AA):\n{aa_formatted}\n\n" # 5'UTR部分 - 蓝色 if start > 0: utr5 = html.escape(seq[:start]) # 每50个字符一组显示 utr5_formatted = '\n'.join([utr5[i:i + frame_lenth] for i in range(0, len(utr5), frame_lenth)]) html_result += f"5'UTR ({len(utr5)} bp):\n{utr5_formatted}\n\n" else: html_result += f"5'UTR:\nN/A\n\n" if end - start > 0: # CDS部分 - 绿色 html_result += f"CDS align ({len(cds_seq)} bp):\n" # 格式化显示CDS序列和对应的氨基酸 for i in range(0, len(cds_seq), frame_lenth): # 显示核苷酸序列 nt_chunk = cds_seq[i:i + frame_lenth] nt_formatted = ' '.join([nt_chunk[j:j + 3] for j in range(0, len(nt_chunk), 3)]) html_result += f"{nt_formatted}\n" # 显示对应的氨基酸序列 aa_start = i // 3 aa_end = min(aa_start + frame_lenth // 3, len(aa_seq)) aa_chunk = aa_seq[aa_start:aa_end] aa_formatted = ' '.join(aa_chunk) # 每个氨基酸之间加三个空格 # 添加空格对齐氨基酸和密码子 alignment = ' ' * (len(nt_formatted.split()[0]) // 2) html_result += f"{alignment}{aa_formatted}\n" html_result += "\n" # 3'UTR部分 - 紫色 if end != -1 and end < len(seq): utr3 = html.escape(seq[end:]) # 每50个字符一组显示 utr3_formatted = '\n'.join([utr3[i:i + frame_lenth] for i in range(0, len(utr3), frame_lenth)]) html_result += f"3'UTR ({len(utr3)} bp):\n{utr3_formatted}\n" else: html_result += "3'UTR: N/A" image_path = None if len(draw_2d)>0: status_msg = log(f'start draw_2d {draw_2d}') # timestamp = datetime.now().strftime("%Y%m%d%H%M%S") # uid = uuid.uuid4().hex[:8] # dirout = f'tmp/{timestamp}_{uid}_rna_outputs/' # os.makedirs(dirout, exist_ok=True) image_path, mfe, structure,message = generate_rna_structure(utr5, cds_seq, utr3, structure,draw_2d=draw_2d) status_msg += f'\n{message}' mfe = f'MFE={mfe:.2f} kcal/mol' if mfe else None html_result += f"
\n\ndbn:\n>rna|start={start}|stop={end}|{mfe}\n{seq}\n{structure}\n
" status_msg += f'\n{mfe}\n' if mfe else '' # image_path = '\n'.join([f""" #
# # SVG 加载失败 # #
# """ for svg_path in image_path]) # png need more time to show, but gallery can't show svg well mRNA_header = f'>rna|start={start}|stop={end}|{mfe}\n' else: mRNA_header = f'>rna|start={start}|stop={end}' fasta_txt = f'{mRNA_header}\n{seq}\n{structure}\n' mRNA_fasta = fasta_format_block(fasta_txt) aa_seq_formated = get_AA_from_CDS(cds_seq, width=78, block=6) utr5 = seq_formatted(utr5, width=20, block=10) cds_seq = seq_formatted(cds_seq, width=15, block=3) utr3 = seq_formatted(utr3, width=20, block=10) scroll_html = """ """ html_result+=scroll_html return html_result, image_path, mRNA_fasta, start, end,utr5,cds_seq,utr3,cds_posi_collect,aa_seq_formated, status_msg def gc_content(x): return (x.count('G') + x.count('C'))/len(x) def get_AA_from_CDS(cds_seq,width=70,block=10): cds_seq = parse_seq_input(cds_seq)[1] return seq_formatted(translate_cds(cds_seq),width=width,block=block) class MaoTaoWeb: def __init__(self): self.app = self.design_app() # self.tail = self.common_tail() # .app-root-anchor { # min-height: 100vh; # flex-shrink: 0; # } # .page-anchor { # min-height: 120px; # flex-shrink: 0; # } # .cds-table-box .gr-dataframe .wrap { # max-height: 320px; # overflow-y: auto; # } # .gradio-dataframe table { # max-height: 100px !important; # overflow-y: auto !important; # } def design_app(self): IS_HF = os.environ.get("SPACE_ID") is not None HF_css = """ .scroll-table { max-height: 50px; overflow-y: auto; }""" local_css = """ .scroll-table { max-height: 200px; overflow-y: auto; } """ temp_css = HF_css if IS_HF else local_css print('IS_HF',IS_HF) # 创建Gradio界面 with gr.Blocks(title="CDS Designer", theme=gr.themes.Soft(),css=""" @import url('https://fonts.googleapis.com/css2?family=Open+Sans:wght@300;400;500;600&family=Fira+Code:wght@400;500&display=swap'); .gradio-container { font-family: 'Open Sans', sans-serif !important; } /* 作用于所有加了 .mono 的 Textbox */ .mono { font-family: 'Courier New', Courier, monospace !important; white-space: pre !important; } .mono textarea { overflow-y: auto !important; } /* Accordion 外层(details) */ details { background-color: #f8f9ff; border-radius: 1px; padding: 1px; margin-bottom: 1px; } /* Accordion 标题栏 */ details > summary { background-color: #eef0ff; border-radius: 11px; padding: 1px 1px; cursor: pointer; } /* Accordion 内容区域 */ details > div { padding: 1px; } .gradio-tabs button[data-selected="true"] { border-bottom: 1px solid #6C63FF; } html, body { height: auto; overflow-y: auto; } .gradio-container { min-height: auto !important; } """+temp_css, js=""" () => { window.scrollTo({ top: 0, behavior: "instant" }); } """) as app: self.head = self.common_head() # 创建各个标签页 with gr.Tabs(): self.cds_optimization_tab() self.mrna_annotation_tab() self.help_tab() self.resources_tab() return app def common_head(self): gr.Markdown("# 🧬AA2CDS") gr.Markdown(""" A context-aware web server for protein-guided coding sequence (CDS) design across multiple host species. (Free for all academic and commercial use) """) @staticmethod def common_tail(): no_line_number = gr.Textbox( label="No Line Number and Space", placeholder="paste sequence and clean the line number and space", lines=2, max_lines=2, show_copy_button=True, elem_classes=["mono"], ) no_line_number.change(fn=parse_formated_input, inputs=no_line_number, outputs=no_line_number) def mrna_annotation_tab(self): with gr.Tab("🔬 Structure & Translation Check"): # self.common_head() with gr.Row(): with gr.Column(scale=3): gr.Markdown("## Region-aware mRNA validation and secondary structure visualization") gr.Markdown(""" This module validates CDS boundaries, translation frame integrity, and visualizes mRNA secondary structure in 5′UTR–CDS–3′UTR contexts, with a round-trip translation check to ensure amino-acid fidelity. """) with gr.Column(scale=1): # 全局状态显示 self.status_display = gr.Textbox( label="Running Log", value="Ready to start", interactive=False, lines=2, max_lines=2 ) with gr.Row(): with gr.Column(scale=3): with gr.Accordion("🔹 Input CDS and backbone (click to expand/collapse)", open=True,elem_classes=["details"]) as input_div: with gr.Row(): utr5_input = gr.Textbox( label="5'UTR Sequence", placeholder="Enter 5'UTR sequence here...", lines=6, max_lines=6, elem_classes=["mono"], show_copy_button=True, ) CDS_input = gr.Textbox( label="CDS Sequence", placeholder="Enter CDS sequence here...", lines=6, max_lines=6, elem_classes=["mono"], show_copy_button=True, ) utr3_input = gr.Textbox( label="3'UTR Sequence", placeholder="Enter 3'UTR sequence here...", lines=6, max_lines=6, elem_classes=["mono"], show_copy_button=True ) # utr5_input.change(fn=lambda x:seq_formatted(x,width=20,block=10), inputs=utr5_input, outputs=utr5_input,every=2) # CDS_input.change(fn=lambda x:seq_formatted(x,width=18,block=3), inputs=CDS_input, outputs=CDS_input,every=2) # utr3_input.change(fn=lambda x:seq_formatted(x,width=20,block=10), inputs=utr3_input, outputs=utr3_input,every=2) with gr.Column(scale=1): draw_2d = gr.CheckboxGroup( choices=[ "Full mRNA", "5'leader (30 nt)", "5'UTR", "CDS", "3'UTR", ], value=["Full mRNA"], label="Draw 2D Structure", info="Optional: Draw 2D structure of the selected region" ) position_table = gr.Dataframe( value=pd.DataFrame( [[-1, -1, -1]], columns=["start", "end", "cds_len"] ), datatype=["number", "number", "number"], interactive=False, label="Detected ORFs", ) with gr.Accordion("🔹 Or Input full mRNA (click to expand/collapse)", open = False,elem_classes=["details"]) as opt_input_div: with gr.Row(): with gr.Column(scale=3): # input mrna_input = gr.Textbox( label="mRNA Sequence", info='Used only if UTR5/CDS/UTR3 are not provided. Enter full mRNA sequence.', placeholder="Example: AUGCCUACGUAGCUAGCUAGCUA... (A, U, C, G only)", lines=4, max_lines=4, # value = seq_formatted('GCCACCAUGCCAUGAACAGCUACAUGCCAUGAACAGCUACAUGCCAUGAACAGCUAC'), elem_classes=["mono"], show_copy_button=True, # info='Input mRNA sequence and CDS position' ) # gr.Markdown('**Input** mRNA sequence and CDS position, or load example sequence') with gr.Column(scale=1): start_position = gr.Textbox( label="CDS Start", # value="-1", placeholder="Auto", ) stop_position = gr.Textbox( label="CDS End", # value="-1", placeholder="Auto", ) # submit and example button with gr.Row(): example_btn = gr.Button("Load Example", variant="secondary",scale=1) annotate_btn = gr.Button("🚀 Analyze mRNA (optional 2D structure)", variant="primary",scale=2) clean_btn = gr.Button("Clean mRNA", variant="secondary",scale=1) gr.Markdown('🔍 Running log displayed in the upper right corner, find your results at the bottom of the page') AA_input = gr.Textbox( label='Protein (Translated from CDS)', placeholder="Will display the result Transfer CDS sequence to protein sequence", lines=3, max_lines=3, elem_classes=["mono"], show_copy_button=True, interactive=False, # require=True ) # CDS_input.change(fn=lambda x:get_AA_from_CDS(x,width=78,block=6), inputs=CDS_input, outputs=AA_input) with gr.Accordion("More details in sequence regions (click to expand)",open=False,elem_classes=["details"]): annotation_output = gr.HTML( label="Sequence Regions", value="
Results will appear here
" ) output_image = gr.Gallery( label="2D mRNA Secondary Structure", object_fit="contain", interactive=False, ) # output_image = gr.HTML() with gr.Accordion("Color Legend (click to expand)", open=False,elem_classes=["details"]): gr.Markdown(""" | 颜色 | 区域 | |------|------| | red | 5'UTR 区域 | | blue | CDS起始区域 | | pink | CDS终止区域 | | orange | 3'UTR 区域 | | yellow | 起始密码子 (AUG) | | purple | 终止密码子 (UAA, UAG, UGA) | | #bcffdd | start/stop codon context | | ⌒⌒⌒ #6ed86e | base pair between start/stop codon context | """) # mrna_input.change(fn=fasta_format_block, inputs=[mrna_input], outputs=mrna_input) annotate_btn.click( annotate_sequence, inputs=[mrna_input,start_position,stop_position,utr5_input,CDS_input,utr3_input,draw_2d,position_table], outputs=[annotation_output,output_image,mrna_input,start_position,stop_position, utr5_input,CDS_input,utr3_input,position_table,AA_input,self.status_display] ) # CDS_input.change(fn=lambda x:get_AA_from_CDS(x,width=78,block=6), inputs=CDS_input, outputs=AA_input) example_btn.click( lambda: [EXAMPLE_UTR5,EXAMPLE_CDS,EXAMPLE_UTR3], outputs=[utr5_input,CDS_input,utr3_input] ) clean_btn.click( lambda: [None,None,'',-1,-1, pd.DataFrame([[-1,-1,-1]],columns=["start", "end","cds_len"])], outputs=[annotation_output,output_image,mrna_input,start_position,stop_position ,position_table] ) def load_and_run(evt: gr.SelectData,df): # 无法接受其他组件,除非是state # df = evt.value # 当前 DataFrame(pandas) row_idx = evt.index[0] # 被选中的行号 row = df.iloc[row_idx] start,end = row["start"], row["end"] return start,end position_table.select( fn = load_and_run, inputs = [position_table], outputs=[start_position, stop_position], ) download_btn = gr.DownloadButton( label="📦 Download Structure & Translation Results" ) download_btn.click( fn=package_structure_translation_results, inputs=[ annotation_output, # region html mrna_input, AA_input, position_table, output_image, start_position, stop_position, ], outputs=download_btn ) self.common_tail() def cds_optimization_tab(self): with gr.Tab("🧬 CDS Design"): # self.common_head() gr.Markdown("## Coding Sequence (CDS) Design") with gr.Column(): # input sequence and configures with gr.Row(): with gr.Column(scale=2): protein_seq = gr.Textbox( label="Protein Sequence (Amino Acids)", placeholder="Paste or type an amino-acid sequence (single-letter code)...", lines=4, elem_classes=["mono"], ) # protein_seq.change(fn=seq_formatted, inputs=[protein_seq], outputs=[protein_seq]) cds_example_btn = gr.Button("Load Example", variant="secondary") with gr.Column(scale=1): # method = gr.Radio( # choices=["AA2CDS", "CAI","GC"], # label="Optimization Method", # value="AA2CDS", # elem_classes=["vertical","secondary"] # ) # 多选 method = gr.CheckboxGroup( choices=["AA2CDS", "CAI","GC"], label="Optimization Strategy", value="AA2CDS", elem_classes=["vertical","secondary"], ) species = gr.Dropdown( choices=list(species_data.keys()), # value="Homo_sapiens", label="Target Organism", value="Homo_sapiens", ) # clearn button clear_btn = gr.ClearButton( components=[protein_seq], value="🧹 Clear Input" ) with gr.Row(): with gr.Accordion("Codon usage table: input options (click to expand)", open=False): # 留缩进容易变成代码块 gr.Markdown( """ ### Option 1. Upload a custom codon-usage table - **File format**: CSV (comma-separated) - **Required columns (case-sensitive)**: - `triplet` — RNA codon (e.g. `AUG`) - `amino_acid` — single-letter amino acid (e.g. `M`) - `fraction` — relative codon usage **Notes** - ⚠️ Column names must match **exactly** - Uploaded tables will **override the default codon usage** - This option can also be used to **restrict the codon set** --- ### Option 2. Paste codon-usage data from the Kazusa Database You can construct the URL directly using an **NCBI Taxonomy ID**: https://www.kazusa.or.jp/codon/cgi-bin/showcodon.cgi?species=9606&aa=1&style=N Replace `9606` with the taxonomy ID of your target organism. --- ### How to obtain an NCBI Taxonomy ID **A. Search by species name (recommended)** 1. Visit the [Kazusa Codon Usage Database](https://www.kazusa.or.jp/codon/) 2. Enter the Latin name of your organism (e.g. *Escherichia coli*) 3. Copy the entire table and paste it into the text area **B. Search via NCBI Taxonomy (precise)** 1. Go to [NCBI Taxonomy](https://www.ncbi.nlm.nih.gov/taxonomy) 2. Search using `species_name[Organism]` (e.g. `human[Organism]`) 3. Open the correct record 4. Extract the numeric ID from the URL (e.g. *Homo sapiens* → `9606`) Default organisms (NCBI Taxonomy ID): | NCBI Taxonomy ID | Organism name | |-----------------|---------------| | 9606 | *Homo sapiens* | | 10090 | *Mus musculus* | | 316407 | *Escherichia coli* | | 4932 | *Saccharomyces cerevisiae* | | 4922 | *Pichia pastoris* | """) with gr.Accordion("Variant generation: random seed control (click to expand)", open=False): with gr.Row(): # 左侧:Seeds with gr.Column(scale=5): optimize_seed = gr.Textbox( label="Seeds", value="42,1337", lines=2, max_lines=2, interactive=True, info="Comma-separated random seeds. Seeds are automatically adjusted when the number of variants changes." ) # 右侧:控制区 with gr.Column(scale=3): num_variants = gr.Dropdown( choices=[1, 2, 3, 4, 5], value=2, label="Number of Variants", interactive=True, ) refresh_btn = gr.Button( "🔄 Refresh Seeds", variant="secondary", ) usage_log = gr.Markdown() def loadings(file, species): message = "" if not file: message += f"\nUse codon table form {species} as default..." codon_usage_text = species_dict.get(species, 'Unknown') codon_usage_table = parse_text(codon_usage_text, pattern_txt=None)[ ['triplet', 'amino_acid', 'fraction']] else: file_path = file.name message += f"\nLoading codon table from {file_path}" codon_usage_table = pd.read_csv(file_path)[['triplet', 'amino_acid', 'fraction']] return codon_usage_table, message with gr.Row(): codon_table_input = gr.File(label='Upload Codon Usage File',scale=1,file_types=[".csv"]) codon_usage_text = gr.Text(label='Paste Codon Usage Table (Kazusa format supported)',scale=3,lines=8,max_lines=8,value=species_dict[species.value]) with gr.Column(): codon_usage_note = gr.Markdown("Codon Usage Table Preview") codon_usage_table = gr.State(value=parse_text(species_dict[species.value],pattern_txt = None)[['triplet', 'amino_acid','fraction']]) codon_usage_table_df = gr.DataFrame( headers=["triplet", "amino_acid", "fraction"], value=codon_usage_table.value.head(4), # max_rows = 4, # unexpected keyword argument 'max_rows' # row_count=(4, 'fixed'), # 可视区固定 3 行 row_count=4, # height=40 * 4, # ≈ 3 行高度,触发滚动条 wrap=False, scale=2, interactive=False, elem_classes=["scroll-table"], ) codon_table_input.change(fn=loadings,inputs=[codon_table_input,species],outputs=[codon_usage_table,codon_usage_note]) codon_usage_text.change(fn=lambda x: parse_text(x,pattern_txt = None)[['triplet', 'amino_acid','fraction']],inputs=codon_usage_text,outputs=codon_usage_table) codon_usage_table.change(fn=lambda x: x.head(4),inputs=codon_usage_table,outputs=codon_usage_table_df) species.change( fn=lambda s: (species_dict.get(s, "Unknown"), f"✅ Codon Usage Table for {s}"), inputs=species, # ✅ 必须指定 inputs outputs=[codon_usage_text, usage_log] ) # def listen_to_method(method,codon_usage_table): # species = None # if method == "AA2CDS": # species = gr.update(visible=method == "AA2CDS") # elif method == "CAI": # codon_usage_table.sort_values(by=["amino_acid", "fraction"], ascending=False, inplace=True) # # codon_usage_table.drop_duplicates(subset="amino_acid", inplace=True) # elif method == "MFE": # pass # elif method == "GC": # codon_usage_table['GC'] = codon_usage_table['triplet'].apply(lambda x: gc_content(x)) # codon_usage_table.sort_values(by=["amino_acid", "GC"], ascending=False, inplace=True) # # codon_usage_table.drop_duplicates(subset="amino_acid", inplace=True) # else: # pass # return species,codon_usage_table # method.change(fn=listen_to_method, inputs=[method,codon_usage_table], outputs=[species,codon_usage_table]) optimize_btn = gr.Button("🚀 Optimize CDS", variant="primary", scale=2) optimize_log = gr.Text(label="Optimization Log",max_lines=6,lines=2,interactive=False,value='ready for optimize') # 绑定事件 # 事件处理函数 def on_num_variants_change(num_variants, current_seed): """当变体数量改变时,自动更新seed""" new_seeds = generate_seeds_for_variants(num_variants, current_seed) return new_seeds def on_refresh_seeds(num_variants): """刷新按钮点击事件 - 生成全新随机seed""" seeds = random.sample(range(1000, 10000), num_variants) return ", ".join(map(str, seeds)) # 变体数量改变时更新seed num_variants.change( fn=on_num_variants_change, inputs=[num_variants, optimize_seed], outputs=[optimize_seed] ) # 刷新按钮点击事件 refresh_btn.click( fn=on_refresh_seeds, inputs=[num_variants], outputs=[optimize_seed] ) gr.Markdown("""No explicit ranking is applied. Metrics are provided for descriptive and comparative purposes only and do not represent direct predictions of protein expression. Designs generated using different optimization strategies are included as reference baselines; observed extreme values reflect the underlying criteria rather than optimality. """) with gr.Row(): results_table = gr.Dataframe( label="Optimization Results", headers=['Name','Method','Variant','Seed', 'CAI', 'CAI_head','GC', 'GC_head', 'CDS'], datatype=['str','str',"str","str","number", "number", "number", "number", "str", "str"], # row_count=(5, "row_count"), wrap=False, ) optimization_plot = gr.Plot(label="Score Distribution") with gr.Row(): # download_cds_btn = gr.Button("📥 Download CDS Results", variant="secondary") # cds_download_file = gr.File(label="Download File", visible=False) download_btn = gr.DownloadButton( label="⬇ Download results (ZIP archive)", value='predict_web_results.zip', ) def optimize_and_update(protein_seq, species, codon_usage_table,method,optimize_seed): status_msg = f"🔄 Optimizing CDS sequence using {method} method ({species})..." # 执行优化 df, plot,zip_path,status_msg = optimize_cds(protein_seq, species,codon_usage_table, method,status_msg,optimize_seed) # 最终状态 # final_status = f"✅ Optimization complete! Generated {len(df)} sequences with {variants:,} potential variants" # self.status_display.update(final_status) return df, plot,zip_path,seq_formatted(protein_seq),status_msg optimize_btn.click( optimize_and_update, # protein_seq, species, codon_usage_table,method inputs=[protein_seq, species,codon_usage_table,method,optimize_seed], outputs=[results_table, optimization_plot,download_btn,protein_seq, optimize_log] ) cds_example_btn.click(lambda: EXAMPLE_PROTEIN, outputs=protein_seq) def resources_tab(self): with gr.Tab("📚 Resources"): # self.common_head() gr.Markdown("## External Resources and References") gr.Markdown(""" The following resources are provided for reference and complementary analysis and are not integrated into the AA2CDS workflow. """) with gr.Row(): with gr.Column(): gr.Markdown(""" ### Databases - **NCBI GenBank** – annotated nucleotide sequence database https://www.ncbi.nlm.nih.gov/genbank/ - **Codon Usage Database (Kazusa)** – species-specific codon usage statistics https://www.kazusa.or.jp/codon/ - **Codon Usage Species Tables** – curated codon usage tables https://www.detaibio.com/tools/codon-usage-table.html - **ViralZone** – viral genome organization and replication https://viralzone.expasy.org/ - **NEB Enzyme Database** – restriction enzymes and recognition sites https://tools.neb.com/ """) with gr.Column(): gr.Markdown(""" ### Sequence Analysis - **BLAST** – sequence similarity search https://blast.ncbi.nlm.nih.gov/Blast.cgi - **ViennaRNA** – RNA secondary structure prediction https://www.tbi.univie.ac.at/RNA/ - **Primer3** – primer design for PCR https://primer3.org/ - **T-Coffee** – multiple sequence alignment https://tcoffee.crg.eu/apps/tcoffee/do:regular """) with gr.Column(): gr.Markdown(""" ### CDS & mRNA Design - **mRNA Designer Platform** – mRNA sequence design https://www.biosino.org/mRNAdesigner/main - **Codon Optimization Tool** – codon usage–based optimization https://www.novoprolabs.com/tools/codon-optimization - **Reverse Translation Tool** – protein back-translation https://www.bioinformatics.org/sms2/rev_trans.html """) with gr.Column(): gr.Markdown(""" ### RNA Regulation & Interaction - **RBPmap** – RNA-binding protein binding site prediction https://rbpmap.technion.ac.il/ - **CISBP-RNA Motif DB** – RNA-binding protein motifs https://cisbp-rna.ccbr.utoronto.ca/TFTools.php - **RPcontact** – RNA–protein interaction contact prediction https://huggingface.co/spaces/julse/RPcontact """) def help_tab(self): with gr.Tab("📖 Help & Docs"): # self.common_head() gr.Markdown("""## AA2CDS documentation (Help & Docs) This page provides step-by-step instructions and detailed explanations of inputs, outputs, and evaluation metrics used in the AA2CDS web server. It is intended to help users correctly interpret results and understand the scope and limitations of the analyses provided. Click a section below to view detailed explanations of each module. """) with gr.Accordion('🔹 CDS Design (click to collapse / expand)',open=False,elem_classes=["details"]): help_input= gr.Markdown("""--- This module generates optimized CDS sequences from protein inputs using different codon optimization strategies. ![AA2CDS Optional](https://huggingface.co/spaces/julse/maotao/resolve/main/images/aa2cds_workflow.jpg) ## Input Requirements ### **1. Input Protein Sequence** Paste the amino acid sequence (single-letter code) into the **Protein Sequence** field. *Note: Only standard protein sequences are supported.* ### **2. Select Optimization Method** Choose one CDS optimization strategy: | Method | Description | |--------|-------------| | **AA2CDS** | Context-aware back-translation using a deep learning model trained on protein–CDS pairs | | **CAI** | Codon adaptation index–based optimization | | **GC** | GC content–controlled optimization | ### **3. Select Target Species** The corresponding codon usage table is loaded automatically for the selected species. ### **4. Optional: Custom Codon Usage Table** Users may: - Upload a custom codon usage table - Paste codon usage data directly - Use the default table for the selected species Parsed tables are shown in the preview panel for verification. ### **5. Generate Optimized CDS** Click **Optimize CDS** to generate CDS variants. - **Variant count**: Controls how many alternative CDS sequences to generate - **Random seeds (optional)**: Ensure reproducibility of stochastic generation ## Optional Configuration ![AA2CDS Optional](https://huggingface.co/spaces/julse/maotao/resolve/main/images/aa2cds_workflow_optional.jpg) **Random Seeds** control stochastic variant generation: - Leave empty for different results each run - Specify a seed value for reproducible outputs - Useful for comparing optimization methods or debugging ## Results Overview ![AA2CDS result](https://huggingface.co/spaces/julse/maotao/resolve/main/images/aa2cds_result.jpg) **Key Features:** - All generated CDS variants encode the **exact same amino acid sequence** as the input protein - Each variant employs different codon combinations according to the selected optimization method - Results include sequence metrics and optimization scores **Output Includes:** - Optimized CDS sequences - Sequence statistics (GC content, length, etc.) - Optimization scores based on selected method - Download options for further analysis *Note:* AA2CDS optimizes coding sequences based on codon usage and sequence context but does **not directly predict protein expression levels**. """) with gr.Accordion('🔹 mRNA Structure & Translation (click to collapse / expand)',open=False,elem_classes=["details"]): help_input = gr.Markdown("""--- This module enables region-aware analysis of full-length mRNA sequences, including **CDS boundary verification**, **mRNA secondary structure visualization**, and **translation consistency checks**. Designed for **inspection and validation**—not sequence optimization. ## Input Options Two alternative input modes are supported: ### **Option 1: Input CDS and Backbone** (Recommended for Design Inspection) ![Input CDS and backbone](https://huggingface.co/spaces/julse/maotao/resolve/main/images/mRNA_input.jpg) **Required Components:** - **5′ UTR sequence** - **CDS sequence** - **3′ UTR sequence** These three components are concatenated to form a full mRNA sequence (5′UTR–CDS–3′UTR). **When to use:** - Inspecting CDS variants generated by AA2CDS or other tools - Inserting alternative CDS designs into a fixed UTR backbone - Comparing structural effects of synonymous codon substitutions ### **Option 2: Input Full mRNA Sequence** ![Input full mRNA](https://huggingface.co/spaces/julse/maotao/resolve/main/images/mRNA_input_optional.jpg) Enter a full-length mRNA sequence directly into the **mRNA Sequence** field. If CDS positions are not provided, the server automatically detects open reading frames (ORFs). Detected ORFs are displayed in the **Detected ORFs** table and can be selected to populate CDS boundaries. **When to use:** - Validating native or synthetic mRNA constructs - Analyzing externally designed sequences - Inspecting vaccine or circular mRNA backbones ## Optional Visualization Controls ### **Draw 2D Structure** Select one or more regions for secondary structure visualization: - Full mRNA - 5′ leader (first 30 nt) - 5′ UTR - CDS - 3′ UTR Multiple regions may be selected simultaneously. ## Results ![mRNA results](https://huggingface.co/spaces/julse/maotao/resolve/main/images/mRNA_result.jpg) **Output includes:** - Region-aware mRNA secondary structure visualizations - Detected ORFs and CDS boundaries - Translated protein sequence - Annotated sequence regions *Note: Input sequences are **not modified** during analysis.* ## Analysis Performed ### **1. CDS Boundary Verification** When CDS boundaries are available (from user input or ORF detection), the coding region is extracted for downstream analysis. ### **2. mRNA Secondary Structure Prediction** RNA secondary structure is predicted for selected regions. Visualization highlights structural features near functionally important sites, particularly the **5′ leader** and **start codon context**. ### **3. Translation Consistency (Round-Trip Check)** The CDS region is translated *in silico* back into a protein sequence to verify: - Reading-frame integrity - Absence of unintended mutations - Preservation of the encoded amino-acid sequence The translated protein is shown in the **Protein (Translated from CDS)** panel. """) with gr.Accordion('🔹 Sequence Identity and Evaluation Metrics (click to collapse / expand)',open=False,elem_classes=["details"]): help_output = gr.Markdown("""--- The following metrics are provided for descriptive and comparative purposes only and do not represent direct predictions of protein expression or functional performance. Not all metrics are displayed in the current web interface. ### Nucleotide and Codon Identity Sequence identity is defined as the proportion of identical residues at corresponding positions. **Nucleotide identity** reflects base-level differences caused by synonymous codon substitutions: `Nucleotide identity = (Total nucleotides − Different positions) / Total nucleotides` **Codon identity** measures whether codons are completely identical: `Codon identity = (Total codons − Different codons) / Total codons` ### Codon Usage–Related Metrics **Relative Synonymous Codon Usage (RSCU)** Ratio of the observed frequency of a codon to the expected frequency assuming equal usage of synonymous codons. **Codon Adaptation Index (CAI)** Measures the similarity between a gene’s codon usage and that of highly expressed genes in the target species. Values range from 0 to 1, with higher values indicating better adaptation. **Effective Number of Codons (ENC / Nc)** Represents the number of effectively used synonymous codons among 61 possible codons. Lower Nc values indicate stronger codon bias and are often associated with higher expression. ### GC Content GC content is calculated as: `GC = (G + C) / (A + U + G + C) × 100%` GC content influences mRNA secondary structure and translational efficiency. Excessively high GC content (for example, above 70%) may reduce protein expression and often requires sequence rebalancing using synonymous codon substitutions. """) with gr.Accordion('🔹 Typical use case (click to collapse / expand)',open=True,elem_classes=["details"]): gr.Markdown("""--- This section summarizes common scenarios combining **CDS design** and **post-design validation** workflows supported by AA2CDS. ### CDS Design - Designing coding sequences from protein inputs using **AA2CDS** with context-aware codon modeling - Generating CDS variants optimized by **CAI** or **GC content** as baseline or control designs - Adapting CDS designs for expression in **different host species** using custom codon usage tables ### Structure & Translation Validation - Verifying CDS variants produced by AA2CDS or other tools - Inspecting structural effects of synonymous codon substitutions, particularly near the 5′ leader region - Validating CDS insertion into predefined UTR backbones - Checking amino-acid fidelity of synthetic or vaccine mRNA designs """) with gr.Accordion('🔹 Data & Privacy (click to collapse / expand)',open=False,elem_classes=["details"]): gr.Markdown("""--- AA2CDS does not store user-submitted sequences or generated files. All intermediate files are created in temporary directories and removed after the session. """) gr.Markdown(""" ### Contact and Support For questions or feedback related to AA2CDS, please contact: Email: jiang_jiuhong@gzlab.ac.cn """) if __name__ == "__main__": # 实例化并启动应用 mtao_web = MaoTaoWeb() mtao_web.app.launch(server_name="0.0.0.0", server_port=7860, debug=True,share=False)