import shutil
import pandas as pd
import requests
from biomart import BiomartServer
import os
import argparse
import sys
[docs]def parse_args(args):
parser = get_arg_parser()
return parser.parse_args(args)
[docs]def get_arg_parser():
parser = argparse.ArgumentParser(
description='STEP 10 : Generates PyMOL scripts to visualize protein structures with highlighted s-exons and ASRUs.'
)
parser.add_argument('--gene',
dest='geneName',
type=str,
required=True,
help='name of gene')
parser.add_argument('--dataPATH',
type=str,
required=True,
help='path to dir containing Thoraxe outputs')
return parser
# get the exon coordinates from the PIR sequence file
[docs]def get_sexon_coord(gid, tid, seqFname):
# open, read and close the input file
fseq = open(seqFname)
lines = fseq.readlines()
fseq.close()
# go through the file until finding the tid
i = 0
found = False
while (i < len(lines)) and (not found):
found = lines[i].startswith('>P1;'+gid+' ') and tid in lines[i]
i = i + 1
# if the tid was found
if found:
sex = lines[i][:-1]
# should be a list since they are ordered!
l = []
sexRef = sex[0]
startRef = 0
# go through the whole sequence
for i in range(1, len(sex)):
# if the sexon has just changed
if sex[i] != sexRef:
# append the previous sexon to the list
# +1 to the start to shift
# nothing to the end because we want the one before last
l.append((sexRef, startRef+1, i))
sexRef = sex[i]
startRef = i
# don't forget the last one!
# id only one amino acid, startRef+1 should be the last position
# and i should be equal to len(sex)
l.append((sexRef, startRef+1, i+1))
return l
# get the sexon id from the dictionary file
[docs]def get_sexon_id(dictFname):
# open, read and close the input file
fdic = open(dictFname)
lines = fdic.readlines()
fdic.close()
d = {}
for line in lines:
words = line[:-1].split()
# key: symbol, value: id
d[words[1]] = words[0]
print(d)
return d
[docs]def get_asrus(asruFname):
# res is a dico with as keys the sexons and as values some instance ids
res = {}
# readf the data
fasru = open(asruFname)
lines = fasru.readlines()
fasru.close()
iASRU = 1
iInst = 1
# each line should be an asru
for line in lines[1:]:
words = line[:-1].strip().split('\"')
# get the instances
instances = words[1][1:-1].strip().split(',')
# for each instance
for insta in instances:
# remove extra character
insta = insta.strip().strip("'")
# get a list of s-exons
wds = insta.split(".")
# if there are more than 1 s-exon
if len(wds) > 1:
insta = []
for wd in wds:
res[wd.strip("$")] = 'a'+str(iASRU)+'i'+str(iInst)
else:
res[insta] = 'a'+str(iASRU)+'i'+str(iInst)
iInst = iInst + 1
iASRU = iASRU+1
iInst = 1
return res
[docs]def get_pdb_span(pdb):
fpdb = open(pdb)
lines = fpdb.readlines()
fpdb.close()
span = []
for line in lines:
if line.startswith("ATOM"):
if line[12:16].strip() == "CA":
span.append(int(line[22:26]))
return span
# write out the PML file
# Function to get the UniProt ID from the AlphaFold DB PDB file name
# For example: A0A2C9C333 from AF-A0A2C9C333-F1-model_v4.pdb
[docs]def get_uniprot_id_from_filename(pdb_filename):
# Split the file name at the hyphen and underscore characters
parts = pdb_filename.split("-")
uniprot_id = parts[1] # Extract the UniProt ID
return uniprot_id
[docs]def write_pml(coord, d, asrus, pdb, outname):
fout = open(outname, "w")
# use the PDB file in the current directory to avoid problems when running on Docker
fout.write('load '+pdb.split('/')[-1]+'\n')
fout.write('bg_color white\n')
span = get_pdb_span(pdb)
nameProt = pdb.split('/')[-1][:-4]
fout.write('color white '+nameProt+'\n')
# partnerId = 'p'+pdb[-5]
partnerId = get_uniprot_id_from_filename(nameProt)
mycolsex = ['skyblue', 'yelloworange']
mycolasru = ['firebrick', 'lime']
coli = 0
colj = 1
currentInst = ['', []]
# go over all sexons
print(coord)
for tup in coord:
start = tup[1]
end = tup[2]
# focus on the relevant span
if start >= span[0] or end <= span[-1]:
sex = d[tup[0]]
fout.write('select '+partnerId+'_'+sex+', resid ' +
str(start)+':'+str(end)+' and '+nameProt+'\n')
# if sex is in an instance
if sex in asrus:
print(sex)
# different from the current one
if asrus[sex] != currentInst[0]:
# if it is not the very first one (current is dummy)
if currentInst[0] != '':
# select and color the current one
namSel = partnerId+currentInst[0]+'_'+'+'.join(currentInst[1])
fout.write('select '+namSel+', ' +
' or '.join(currentInst[1])+' and '+nameProt+'\n')
fout.write('color '+mycolasru[colj]+', '+namSel+'\n')
# change the current one
currentInst[0] = asrus[sex]
currentInst[1] = [partnerId+'_'+sex]
# switch instance color
colj = 1 - colj
# same as the current one
else:
# simply append the sexon to the instance definition
currentInst[1].append(partnerId+'_'+sex)
else:
# show s-exon color if it's not part of an instance
fout.write('color '+mycolsex[coli]+', '+partnerId+'_'+d[tup[0]]+'\n')
# in any case we need to switch sexon color
coli = 1 - coli
# need to select and color the very last one (that won't be replaced anyway...)
# I'm not sure that is used at all since we will never have something dummy here...
# maybe if there is only one...?
if currentInst[0] != '':
# name of the current instance (s-repeat)
namSel = partnerId+currentInst[0]+'_'+'+'.join(currentInst[1])
# select the corresponding s-exon combination
fout.write('select '+namSel+', ' +
' or '.join(currentInst[1])+' and '+nameProt+'\n')
# set the color
fout.write('color '+mycolasru[colj]+', '+namSel+'\n')
fout.write('deselect\n')
fout.close()
[docs]def download_alphafold_structure(uniprot_id, output_path='.'):
base_url = 'https://alphafold.ebi.ac.uk/files/'
file_name = f'AF-{uniprot_id}-F1-model_v4.pdb'
url = f'{base_url}{file_name}'
response = requests.get(url)
output_file_path = os.path.join(output_path, file_name)
if response.status_code == 200:
with open(output_file_path, 'wb') as f:
f.write(response.content)
print(f'Successfully downloaded {file_name} to {output_file_path}')
else:
output_file_path = None
print(
f'Error: Could not download {file_name}. HTTP status code: {response.status_code}')
return output_file_path
[docs]def get_transcripts(s_exon_table):
# Read the CSV file
df = pd.read_csv(s_exon_table)
# Initialize an empty list to store the tuples
result = []
# Iterate through each row in the DataFrame
for index, row in df.iterrows():
# Get the species and modify it as required
species_full = row['Species'].split('_')
species = species_full[0][0] + species_full[1]
# Get the gene ID
gene_id = row['GeneID']
# Get the transcript ID cluster and split it by '/'
transcript_id_cluster = row['TranscriptIDCluster'].split('/')
# Iterate through the transcript IDs
for transcript_id in transcript_id_cluster:
# Create a tuple and append it to the result list
result.append((species, gene_id, transcript_id))
# return only the unique tuples
return list(set(result))
[docs]def get_uniprot_ids(species_gene_transcript_ids):
# Create a DataFrame from the output of process_csv
df = pd.DataFrame(species_gene_transcript_ids, columns=[
'species', 'gene_id', 'transcript_id'])
# Split transcripts with '/' into different rows
df = df.assign(transcript_id=df['transcript_id'].str.split(
'/')).explode('transcript_id')
# Keep only unique rows
df = df.drop_duplicates()
# Group by species
grouped = df.groupby('species')
server = BiomartServer("http://www.ensembl.org/biomart")
uniprot_ids = []
data = []
for species, group in grouped:
dataset = server.datasets[f'{species}_gene_ensembl']
gene_ids = group['gene_id'].tolist()
transcript_ids = group['transcript_id'].tolist()
response = dataset.search({
'attributes': ['ensembl_gene_id', 'ensembl_transcript_id', 'uniprotswissprot', 'uniprotsptrembl'],
'filters': {'ensembl_gene_id': gene_ids, 'ensembl_transcript_id': transcript_ids}
})
for line in response.iter_lines():
line = line.decode('utf-8')
fields = line.split('\t')
fetched_gene_id, fetched_transcript_id, uniprotswissprot, uniprotsptrembl = fields
uniprot_ids = [up for up in [uniprotswissprot, uniprotsptrembl]]
for uniprot_id in uniprot_ids:
if fetched_gene_id in gene_ids and fetched_transcript_id in transcript_ids:
data.append((fetched_gene_id, fetched_transcript_id, uniprot_id))
return pd.DataFrame(data, columns=['gene_id', 'transcript_id', 'uniprot_id'])
[docs]def download_pdb_structures(s_exon_table, output_path='.'):
structures = []
species_gene_transcript_ids = get_transcripts(s_exon_table)
df = get_uniprot_ids(species_gene_transcript_ids)
for row in df.itertuples():
gene_id = row.gene_id
transcript_id = row.transcript_id
uniprot_id = row.uniprot_id
try:
if uniprot_id:
pdb_file = download_alphafold_structure(
uniprot_id, output_path=output_path)
if pdb_file is not None:
structures.append((gene_id, transcript_id, pdb_file))
else:
print(
f'Error: Could not find UniProt ID for gene ID {gene_id} and transcript ID {transcript_id}')
except Exception as e:
print(f'Error: {e}')
return structures
[docs]def get_pdb_sequence_length(pdb):
sequence_length = 0
with open(pdb, 'r') as f:
for line in f.readlines():
if line.startswith("ATOM") and line[12:16].strip() == "CA":
sequence_length += 1
return sequence_length
[docs]def get_transcript_sequence_length(gid, tid, seqFname):
with open(seqFname, 'r') as fseq:
lines = fseq.readlines()
i = 0
found = False
while i < len(lines) and not found:
found = lines[i].startswith('>P1;' + gid + ' ') and tid in lines[i]
i += 1
if found:
return len(lines[i].strip())
else:
return None
[docs]def run():
args = parse_args(sys.argv[1:])
gene = args.geneName
path_data = args.dataPATH
seqFname = os.path.join(path_data, gene, 'thoraxe', 'phylosofs', 'transcripts.pir')
dictFname = os.path.join(path_data, gene, 'thoraxe', 'phylosofs', 's_exons.tsv')
s_exon_table = os.path.join(path_data, gene, 'thoraxe', 's_exon_table.csv')
asruFname = os.path.join(path_data, 'data', gene, f'{gene}_ASRUs_table.csv')
if os.path.exists(asruFname):
output_path = os.path.join(path_data, 'data', gene, 'structures')
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path)
structures = download_pdb_structures(s_exon_table, output_path=output_path)
for gene_id, transcript_id, pdb_file in structures:
coord = get_sexon_coord(gene_id, transcript_id, seqFname)
if coord is None:
print(f'Error: Could not find {transcript_id} (gene ID: {gene_id})')
continue
pdb_seq_length = get_pdb_sequence_length(pdb_file)
transcript_seq_length = get_transcript_sequence_length(
gene_id, transcript_id, seqFname)
if pdb_seq_length != transcript_seq_length:
continue
d = get_sexon_id(dictFname)
asrus = get_asrus(asruFname)
pdb_code = pdb_file.split("/")[-1].split(".")[0]
script_file = os.path.join(
output_path, f'{gene_id}_{transcript_id}_{pdb_code}.pml')
write_pml(coord, d, asrus, pdb_file, script_file)
else:
print(f'Error: Could not find {asruFname}')
if __name__ == '__main__':
run()