Louis BECQUEY

aesthetics, comments, and statistics on the produced data

# execution outputs:
nohup.out
jobstats.csv
log_of_the_run.sh
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
......
#!/usr/bin/python3.8
import numpy as np
import pandas as pd
import concurrent.futures, Bio.PDB.StructureBuilder, gzip, io, itertools, json, os, psutil, re, requests, sqlalchemy, subprocess, sys, time, warnings
import concurrent.futures, Bio.PDB.StructureBuilder, gzip, io, json, os, psutil, re, requests, sqlalchemy, subprocess, sys, time, warnings
from Bio import AlignIO, SeqIO
from Bio.PDB import MMCIFParser, PDBIO
from Bio.PDB import MMCIFParser
from Bio.PDB.mmcifio import MMCIFIO
from Bio.PDB.PDBExceptions import PDBConstructionWarning, PDBConstructionException
from Bio.PDB.PDBExceptions import PDBConstructionWarning
from Bio._py3k import urlretrieve as _urlretrieve
from Bio._py3k import urlcleanup as _urlcleanup
from Bio.Alphabet import generic_rna
......@@ -13,10 +13,9 @@ from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.Align import MultipleSeqAlignment
from collections import OrderedDict
from ftplib import FTP
from functools import partial
from os import path, makedirs
from multiprocessing import Pool, cpu_count, Manager, freeze_support, RLock, current_process
from multiprocessing import Pool, Manager
from time import sleep
from tqdm import tqdm
from tqdm.contrib.concurrent import process_map
......@@ -36,7 +35,7 @@ elif path.isdir("/nhome/siniac/lbecquey"): # this is the office PC
else:
print("I don't know that machine... I'm shy, maybe you should introduce yourself ?")
exit(1)
hydrogen = re.compile("[123 ]*H.*")
m = Manager()
running_stats = m.list()
running_stats.append(0) # n_launched
......@@ -49,41 +48,53 @@ warnsymb = '\U000026A0'
errsymb = '\U0000274C'
class NtPortionSelector(object):
"""Class passed to MMCIFIO to select some chain portions in an MMCIF file.
Validates every chain, residue, nucleotide, to say if it is in the selection or not.
"""
def __init__(self, model_id, chain_id, start, end):
self.chain_id = chain_id
self.start = start
self.end = end
self.pdb_model_id = model_id
self.hydrogen_regex = re.compile("[123 ]*H.*")
def accept_model(self, model):
if model.get_id() == self.pdb_model_id:
return 1
return 0
return int(model.get_id() == self.pdb_model_id)
def accept_chain(self, chain):
if chain.get_id() == self.chain_id:
return 1
return 0
return int(chain.get_id() == self.chain_id)
def accept_residue(self, residue):
hetatm_flag, resseq, icode = residue.get_id()
# Refuse waters and magnesium ions
if hetatm_flag in ["W", "H_MG"]:
return 0 # skip waters and magnesium
return 0
# I don't really know what this is but the doc said:
if icode != " ":
warn(f"icode {icode} at position {resseq}\t\t")
if self.start <= resseq <= self.end:
return 1
return 0
# Accept the residue if it is in the right interval:
return int(self.start <= resseq <= self.end)
def accept_atom(self, atom):
name = atom.get_id()
if hydrogen.match(name):
return 0 # Get rid of hydrogens
# Refuse hydrogens
if self.hydrogen_regex.match(atom.get_id()):
return 0
# Accept all atoms otherwise.
return 1
class Chain:
""" The object which stores all our data and the methods to process it.
Chains accumulate information through this scipt, and are saved to files at the end of major steps."""
def __init__(self, nrlist_code):
nr = nrlist_code.split('|')
self.pdb_id = nr[0].lower() # PDB ID
......@@ -107,59 +118,76 @@ class Chain:
return self.pdb_id + '[' + str(self.pdb_model) + "]-" + self.pdb_chain_id
def download_3D(self):
""" Look for the main CIF file (with all chains) from RCSB
"""
status = f"\t> Download {self.pdb_id}.cif\t\t\t"
url = 'http://files.rcsb.org/download/%s.cif' % (self.pdb_id)
if not os.access(path_to_3D_data+"RNAcifs", os.F_OK):
os.makedirs(path_to_3D_data+"RNAcifs")
final_filepath = path_to_3D_data+"RNAcifs/"+self.pdb_id+".cif"
# Check if file already exists, if yes, abort
if os.path.exists(final_filepath):
print(status + f"\t{validsymb}\t(structure exists)")
self.full_mmCIFpath = final_filepath
return
else:
try:
_urlcleanup()
_urlretrieve(url, final_filepath)
self.full_mmCIFpath = final_filepath
print(status + f"\t{validsymb}")
except IOError:
print(status + f"\tERR \U0000274E\t\033[31mError downloading {url} !\033[0m")
self.delete_me = True
# Attempt to download it
try:
_urlcleanup()
_urlretrieve(url, final_filepath)
self.full_mmCIFpath = final_filepath
print(status + f"\t{validsymb}")
except IOError:
print(status + f"\tERR \U0000274E\t\033[31mError downloading {url} !\033[0m")
self.delete_me = True
def extract_portion(self, filename, pdb_start, pdb_end):
""" Extract the part which is mapped to Rfam from the main CIF file and save it to another file.
"""
status = f"\t> Extract {pdb_start}-{pdb_end} atoms from {self.pdb_id}-{self.pdb_chain_id}\t"
self.file = path_to_3D_data+"rna_mapped_to_Rfam/"+filename+".cif"
# Check if file exists, if yes, abort (do not recompute)
if os.path.exists(self.file):
print(status + f"\t{validsymb}\t(already done)", flush=True)
return
model_idx = self.pdb_model - (self.pdb_model > 0) # arrays start at 0, models start at 1
model_idx = self.pdb_model - (self.pdb_model > 0) # because arrays start at 0, models start at 1
pdb_start = int(pdb_start)
pdb_end = int(pdb_end)
# Load the whole mmCIF into a Biopython structure object:
with warnings.catch_warnings():
warnings.simplefilter('ignore', PDBConstructionWarning)
# TODO: check if this with and warnings catch is still useful since i moved to CIF files
warnings.simplefilter('ignore', PDBConstructionWarning) # ignore the PDB problems
# Check if the whole mmCIF file exists. If not, abort.
if self.full_mmCIFpath == "":
print(status + f"\t\U0000274E\t\033[31mError with CIF file of {self.pdb_id} !\033[0m", flush=True)
self.delete_me = True
return
# Load the whole mmCIF into a Biopython structure object:
s = mmcif_parser.get_structure(self.pdb_id, self.full_mmCIFpath)
c = s[model_idx][self.pdb_chain_id] # the desired chain
first_number = c.child_list[0].get_id()[1] # its first residue is numbered 'first_number'
if pdb_start < pdb_end:
start = pdb_start + first_number - 1 # then our start position should be shifted by 'first_number'
# Extract the desired chain
c = s[model_idx][self.pdb_chain_id]
# Pay attention to residue numbering
first_number = c.child_list[0].get_id()[1] # the chain's first residue is numbered 'first_number'
if pdb_start < pdb_end:
start = pdb_start + first_number - 1 # shift our start_position by 'first_number'
end = pdb_end + first_number - 1 # same for the end position
else:
self.reversed = True
self.reversed = True # the 3D chain is numbered backwards compared to the Rfam family
end = pdb_start + first_number - 1
start = pdb_end + first_number - 1
# Do the extraction of the 3D data
# Define a selection
sel = NtPortionSelector(model_idx, self.pdb_chain_id, start, end)
# Save that selection on the mmCIF object s to file
ioobj = MMCIFIO()
ioobj.set_structure(s)
ioobj.save(self.file, sel)
......@@ -167,44 +195,61 @@ class Chain:
print(status + f"\t{validsymb}")
def set_rfam(self, rfam):
""" Rember the Rfam mapping for this chain.
"""
self.rfam_fam = rfam
print("\t> Associating it to", rfam, f"...\t\t\t{validsymb}")
def extract_3D_data(self):
if not os.access(path_to_3D_data+"pseudotorsions/", os.F_OK):
os.makedirs(path_to_3D_data+"pseudotorsions/")
""" Runs DSSR to annotate the 3D chain and get various information about it.
"""
# Check if the file exists. If no, compute it.
if not os.path.exists(path_to_3D_data+f"pseudotorsions/{self.chain_label}.csv"):
# run DSSR (you need to have it in your $PATH, follow x3dna installation instructions)
output = subprocess.run(
["x3dna-dssr", f"-i={self.file}", "--json", "--auxfile=no"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = output.stdout.decode('utf-8')
stderr = output.stderr.decode('utf-8')
stdout = output.stdout.decode('utf-8') # this contains the results in JSON format, or is empty if there are errors
stderr = output.stderr.decode('utf-8') # this contains the evenutal errors
try:
if "exception" in stderr:
# DSSR is unable to parse the chain.
warn(f"Exception while running DSSR: {stderr}\n\tIgnoring {self.chain_label}.\t\t\t", error=True)
self.delete_me = True
return
# Get the JSON from DSSR output
json_object = json.loads(stdout)
# Print eventual warnings given by DSSR, and abort if there are some
if "warning" in json_object.keys():
warn(f"Ignoring {self.chain_label} ({json_object['warning']})\t", error=True)
self.delete_me = True
return
# Extract the part about nucleotides from the DSSR annotation
nts = json_object["nts"]
except KeyError as e:
# Mostly, there are no part about nucleotides in the DSSR output. Abort.
warn(f"Error while parsing DSSR's json output:\n{json_object.keys()}\n{json_object}\n\tignoring {self.chain_label}\t\t\t\t", error=True)
self.delete_me = True
return
print("\t> Computing", self.chain_label, f"pseudotorsions...\t\t{validsymb}", flush=True)
# extract angles
# Prepare data structures to extract the pseudotorisions
l = int(nts[-1]["nt_resnum"]) - int(nts[0]["nt_resnum"]) + 1
eta = [np.NaN] * l
theta = [np.NaN] * l
mask = [ 0 ] * l
seq = [ "-" ] * l # nts that are not resolved will be marked "-" in the sequence, and their mask at 0.
mask = [ 0 ] * l # nts that are not resolved will have a mask at 0
seq = [ "-" ] * l # ... and will be marked "-" in the sequence.
# the residue number of the chain start (which has been previously extracted from a whole mmCIF file)
resnum_start = int(nts[0]["nt_resnum"])
for nt in nts:
......@@ -216,23 +261,32 @@ class Chain:
t = np.NaN
else:
t = float(nt["theta_prime"])
p = int(nt["nt_resnum"]) - resnum_start
mask[p] = int(nt["nt_code"] in "ACGU") # U is a U, u is a modified U and should be replaced by consensus ?
seq[p] = nt["nt_code"].upper().replace('?','-').replace('P','U').replace('T','U') # to align the chain with its family. The final nt should be taken from the PSSM.
p = int(nt["nt_resnum"]) - resnum_start # index of the nt in the chain
# All non-regular-ACGU nucleotides (incl. modified & missing) are masked to 0.
# They will be replaced by the consensus in the alignment.
# U is a U, u is a modified U and should be replaced.
mask[p] = int(nt["nt_code"] in "ACGU")
# seq will be later updated with consensus for all positions with mask=0
seq[p] = nt["nt_code"].upper().replace('?','-').replace('P','U').replace('T','U')
# pseudotorsions
eta[p] = e
theta[p] = t
if self.reversed:
warn(f"Has {self.chain_label} been numbered from 3' to 5' ? Inverting angles.")
# the 3D structure is numbered from 3' to 5' instead of standard 5' to 3'
# The 3D structure is numbered from 3' to 5' instead of standard 5' to 3'
# or the sequence that matches the Rfam family is 3' to 5' instead of standard 5' to 3'.
# anyways, you need to invert the angles.
# Anyways, you need to invert the angles.
warn(f"Has {self.chain_label} been numbered from 3' to 5' ? Inverting angles.")
seq = seq[::-1]
mask = mask[::-1]
temp_eta = [ e for e in eta ]
eta = [ theta[n] for n in range(l) ] # eta(n) = theta(l-n+1) forall n in ]1, l]
theta = [ temp_eta[n] for n in range(l) ] # theta(n) = eta(l-n+1) forall n in [1, l[
# Creating a df for easy saving to CSV
pd.DataFrame({"seq": list(seq), "m": list(mask), "eta": list(eta), "theta": list(theta)}
).to_csv(path_to_3D_data+f"pseudotorsions/{self.chain_label}.csv")
print("\t> Saved", self.chain_label, f"pseudotorsions to CSV.\t\t{validsymb}", flush=True)
......@@ -249,6 +303,7 @@ class Chain:
self.thetas = d.theta.values
print(f"\t> Loaded data from CSV\t\t\t\t{validsymb}", flush=True)
# Remove too short chains
if self.length < 5:
warn(f"{self.chain_label} sequence is too short, let's ignore it.\t", error=True)
self.delete_me = True
......@@ -256,25 +311,31 @@ class Chain:
class Job:
""" This class contains information about a task to run later.
This could be a system command or the execution of a Python function.
Time and memory usage of a job can be monitored.
"""
def __init__(self, results="", command=[], function=None, args=[], how_many_in_parallel=0, priority=1, timeout=None, checkFunc=None, checkArgs=[], label=""):
self.cmd_ = command
self.func_ = function
self.args_ = args
self.checkFunc_ = checkFunc
self.checkArgs_ = checkArgs
self.results_file = results
self.priority_ = priority
self.timeout_ = timeout
self.comp_time = -1 # -1 is not executed yet
self.max_mem = -1 # not executed yet
self.label = label
self.cmd_ = command # A system command to run
self.func_ = function # A python function to run
self.args_ = args # The args tuple of the function to run
self.checkFunc_ = checkFunc # A function to check if the Job as already been executed before (and abort execution if yes)
self.checkArgs_ = checkArgs # Arguments for the checkFunc
self.results_file = results # A filename where the job stores its results, to check for existence before execution
self.priority_ = priority # Priority of the job in a list of jobs (Jobs with priority 1 are processed first, then priority 2, etc. Unrelated to processes priority.)
self.timeout_ = timeout # Abort the job if taking too long
self.comp_time = -1 # Time to completion of the job. -1 means 'not executed yet'
self.max_mem = -1 # Peak RAM+Swap usage of the job. -1 means 'not executed yet'
self.label = label # Title
# Deploy the job on a Pool() started using 'how_many_in_parallel' CPUs.
if not how_many_in_parallel:
self.nthreads = read_cpu_number()
elif how_many_in_parallel == -1:
self.nthreads = read_cpu_number() - 1
else:
self.nthreads = how_many_in_parallel
self.useless_bool = False
def __str__(self):
if self.func_ is None:
......@@ -285,6 +346,12 @@ class Job:
class AnnotatedStockholmIterator(AlignIO.StockholmIO.StockholmIterator):
""" A custom Stockholm format MSA parser that returns annotations at the end.
Inherits from Bio.AlignIO and simply overloads the __next__() method to save the
gr, gf and gs dicts at the end.
"""
def __next__(self):
"""Parse the next alignment from the handle."""
handle = self.handle
......@@ -395,10 +462,10 @@ class AnnotatedStockholmIterator(AlignIO.StockholmIO.StockholmIterator):
# assert len(gr) <= len(ids)
self.ids = ids.keys()
self.sequences = seqs
self.seq_annotation = gs
self.seq_col_annotation = gr
self.alignment_annotation = gf
self.sequences = seqs #
self.seq_annotation = gs # This is the new part:
self.seq_col_annotation = gr # Saved for later use.
self.alignment_annotation = gf #
if ids and seqs:
......@@ -450,62 +517,91 @@ class AnnotatedStockholmIterator(AlignIO.StockholmIO.StockholmIterator):
class Monitor:
""" A job that simply watches the memory usage of another process.
Checks the RAM+Swap usage of monitored process and its children every 0.1 sec.
Returns the peak value at the end.
"""
def __init__(self, pid):
self.keep_watching = True
self.target_pid = pid
def check_mem_usage(self):
#print("\t> Monitoring process", self.target_pid, "from process", os.getpid())
# Get the process object
target_process = psutil.Process(self.target_pid)
# Start watching
max_mem = -1
while self.keep_watching:
try:
# read memory usage
info = target_process.memory_full_info()
mem = info.rss + info.swap
# Do the same for every child process
for p in target_process.children(recursive=True):
info = p.memory_full_info()
mem += info.rss + info.swap
except psutil.NoSuchProcess:
print("\t> ERR: monitored process does not exist anymore ! Did something go wrong ?")
# The process that we watch is finished, dead, or killed.
self.keep_watching = False
finally:
# Update the peak value
if mem > max_mem:
max_mem = mem
# Wait 100 ms and loop
sleep(0.1)
# The watch has ended
return max_mem
def read_cpu_number():
# do not use os.cpu_count() on LXC containers
# it reads info from /sys wich is not the VM resources but the host resources.
# Read from /proc/cpuinfo instead.
# As one shall not use os.cpu_count() on LXC containers,
# because it reads info from /sys wich is not the VM resources but the host resources.
# This function reads it from /proc/cpuinfo instead.
p = subprocess.run(['grep', '-Ec', '(Intel|AMD)', '/proc/cpuinfo'], stdout=subprocess.PIPE)
return int(int(p.stdout.decode('utf-8')[:-1])/2)
def warn(message, error=False):
"""Pretty-print warnings and error messages.
"""
if error:
print(f"\t> \033[31mERR: {message}\033[0m{errsymb}", flush=True)
else:
print(f"\t> \033[33mWARN: {message}\033[0m{warnsymb}", flush=True)
def execute_job(j, jobcount):
"""Run a Job object.
"""
# increase the counter of running jobs
running_stats[0] += 1
if len(j.cmd_):
# log the command
# Monitor this process
m = -1
monitor = Monitor(os.getpid())
if len(j.cmd_): # The job is a system command
print(f"[{running_stats[0]+running_stats[2]}/{jobcount}]\t{j.label}")
# Add the command to logfile
logfile = open(runDir + "/log_of_the_run.sh", 'a')
logfile.write(" ".join(j.cmd_))
logfile.write("\n")
logfile.close()
print(f"[{running_stats[0]+running_stats[2]}/{jobcount}]\t{j.label}")
monitor = Monitor(os.getpid())
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# put the monitor in a different thread
assistant_future = executor.submit(monitor.check_mem_usage)
# run the command. subprocess.run will be a child of this process, and stays monitored.
start_time = time.time()
r = subprocess.run(j.cmd_, timeout=j.timeout_, stdout=subprocess.DEVNULL)
end_time = time.time()
# Stop the Monitor, then get its result
monitor.keep_watching = False
m = assistant_future.result()
......@@ -513,28 +609,38 @@ def execute_job(j, jobcount):
print(f"[{running_stats[0]+running_stats[2]}/{jobcount}]\t{j.func_.__name__}({', '.join([str(a) for a in j.args_ if not ((type(a) == list) and len(a)>3)])})")
m = -1
monitor = Monitor(os.getpid())
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# put the monitor in a different thread
assistant_future = executor.submit(monitor.check_mem_usage)
# call the python function (in this process)
start_time = time.time()
r = j.func_(* j.args_)
end_time = time.time()
# Stop the Monitor, then get its result
monitor.keep_watching = False
m = assistant_future.result()
# Job is finished
# increase the counter of finished jobs
running_stats[1] += 1
# return time and memory statistics, plus the job results
t = end_time - start_time
return (t,m,r)
def execute_joblist(fulljoblist, printstats=False):
# reset counters
running_stats[0] = 0
running_stats[1] = 0
running_stats[2] = 0
""" Run a list of job objects.
The jobs in the list can have differente priorities and/or different number of threads.
"""
# sort jobs in a tree structure
# Reset counters
running_stats[0] = 0 # started
running_stats[1] = 0 # finished
running_stats[2] = 0 # failed
# Sort jobs in a tree structure, first by priority, then by CPU numbers
jobs = {}
jobcount = len(fulljoblist)
for job in fulljoblist:
......@@ -543,28 +649,32 @@ def execute_joblist(fulljoblist, printstats=False):
if job.nthreads not in jobs[job.priority_].keys():
jobs[job.priority_][job.nthreads] = []
jobs[job.priority_][job.nthreads].append(job)
# number of different priorities in the list
nprio = max(jobs.keys())
if printstats:
# Write stats in a file
# Write statistics in a file (header here)
f = open("jobstats.csv", "w")
f.write("label,comp_time,max_mem\n")
f.close()
# for each priority level
# Process the jobs from priority 1 to nprio
results = {}
for i in range(1,nprio+1):
if i not in jobs.keys(): continue # ignore this priority level if no job available
different_thread_numbers = [n for n in jobs[i].keys()]
different_thread_numbers.sort()
if i not in jobs.keys(): continue # no job has the priority level i
print("processing jobs of priority", i)
res = []
different_thread_numbers = sorted(jobs[i].keys())
# jobs should be processed 1 by 1, 2 by 2, or n by n depending on their definition
res = []
for n in different_thread_numbers:
# get the bunch of jobs of same priority and thread number
bunch = jobs[i][n]
if not len(bunch): continue # ignore if no jobs should be processed n by n
print("using", n, "processes:")
if not len(bunch): continue # no jobs should be processed n by n
print("using", n, "processes:")
# execute jobs of priority i that should be processed n by n:
p = Pool(processes=n)
raw_results = p.map(partial(execute_job, jobcount=jobcount), bunch)
......@@ -572,9 +682,11 @@ def execute_joblist(fulljoblist, printstats=False):
p.join()
if printstats:
# extract computation times
# Extract computation times
times = [ r[0] for r in raw_results ]
mems = [ r[1] for r in raw_results ]
# Write them to file
f = open("jobstats.csv", "a")
for j, t, m in zip(bunch, times, mems):
j.comp_time = t
......@@ -582,54 +694,84 @@ def execute_joblist(fulljoblist, printstats=False):
print(f"\t> {j.label} finished in {t:.2f} sec with {int(m/1000000):d} MB of memory. \t{validsymb}", flush=True)
f.write(f"{j.label},{t},{m}\n")
f.close()
# Separate the job results in a different list
res += [ r[2] for r in raw_results ]
# Add the results of this tree branch to the main list
results[i] = res
# throw back the money
return results
def download_Rfam_PDB_mappings():
# download PDB mappings to Rfam family
"""Query the Rfam public MySQL database for mappings between their RNA families and PDB structures.
"""
# Download PDB mappings to Rfam family
print("> Fetching latest PDB mappings from Rfam...", end='', flush=True)
try:
db_connection = sqlalchemy.create_engine('mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam')
mappings = pd.read_sql('SELECT rfam_acc, pdb_id, chain, pdb_start, pdb_end, bit_score, evalue_score, cm_start, cm_end, hex_colour FROM pdb_full_region WHERE is_significant=1;', con=db_connection)
mappings.to_csv(path_to_3D_data + 'Rfam-PDB-mappings.csv')
print(f"\t{validsymb}")
except sqlalchemy.exc.OperationalError:
except sqlalchemy.exc.OperationalError: # Cannot connect :'()
print(f"\t{errsymb}")
# Check if a previous run succeeded (if file exists, use it)
if path.isfile(path_to_3D_data + 'Rfam-PDB-mappings.csv'):
print("\t> Using previous version.")
mappings = pd.read_csv(path_to_3D_data + 'Rfam-PDB-mappings.csv')
else:
print("Can't do anything without data. Check mysql-rfam-public.ebi.ac.uk status and try again later. Exiting.")
else: # otherwise, abort.
print("Can't do anything without data. Can't reach mysql-rfam-public.ebi.ac.uk on port 4497. Is it open on your system ? Exiting.")
exit(1)
return mappings
def download_Rfam_seeds():
""" Download the seed sequence alignments from Rfam.
Does not download if already there. It uses their FTP.
"""
# If the seeds are not available, download them
if not path.isfile(path_to_seq_data + "seeds/Rfam.seed.gz"):
_urlcleanup()
_urlretrieve('ftp://ftp.ebi.ac.uk/pub/databases/Rfam/CURRENT/Rfam.seed.gz', path_to_seq_data + "seeds/Rfam.seed.gz")
# Read Rfam seed alignements
# Prepare containers for the data
aligned_records = []
rfam_acc = []
alignment_len = []
alignment_nseq = []
AlignIO._FormatToIterator["stockholm"] = AnnotatedStockholmIterator # Tell biopython to use our overload
# Tell Biopython to use our overload
AlignIO._FormatToIterator["stockholm"] = AnnotatedStockholmIterator
# Read the seeds
with gzip.open(path_to_seq_data + "seeds/Rfam.seed.gz", encoding='latin-1') as gz:
alignments = AlignIO.parse(gz, "stockholm", alphabet=generic_rna)
# Fill the containers
for align in alignments:
aligned_records.append('\n'.join([ str(s.seq) for s in align ]))
rfam_acc.append(align._fileannotations["AC"][0])
alignment_len.append(align.get_alignment_length())
alignment_nseq.append(len(align._records))
# Build a dataframe with the containers
Rfam_seeds = pd.DataFrame()
Rfam_seeds["aligned_records"] = aligned_records
Rfam_seeds["rfam_acc"] = rfam_acc
Rfam_seeds["alignment_len"] = alignment_len
Rfam_seeds["alignment_nseq"] = alignment_nseq
return Rfam_seeds
def download_Rfam_cm():
""" Download the covariance models from Rfam.
Does not download if already there.
"""
print(f"\t> Download Rfam.cm.gz from Rfam...\t", end='', flush=True)
if not path.isfile(path_to_seq_data + "Rfam.cm"):
try:
......@@ -645,27 +787,41 @@ def download_Rfam_cm():
print(f"\t{validsymb}\t(no need)", flush=True)
def download_Rfam_family_stats(list_of_families):
"""Query the Rfam public MySQL database for statistics about their RNA families.
Family ID, number of sequences identified, maximum length of those sequences.
"""
try:
db_connection = sqlalchemy.create_engine('mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam')
# Prepare the SQL query. It computes the length of the chains and gets the maximum length by family.
q = """SELECT fr.rfam_acc, COUNT(DISTINCT fr.rfamseq_acc) AS 'n_seq',
MAX(
(CASE WHEN fr.seq_start > fr.seq_end THEN fr.seq_start
ELSE fr.seq_end
ELSE fr.seq_end
END)
-
(CASE WHEN fr.seq_start > fr.seq_end THEN fr.seq_end
ELSE fr.seq_start
ELSE fr.seq_start
END)
) AS 'maxlength'
FROM full_region fr
GROUP BY fr.rfam_acc"""
# Query the database
d = pd.read_sql(q, con=db_connection)
# filter the results to families we are interested in
return d[ d["rfam_acc"].isin(list_of_families) ]
except sqlalchemy.exc.OperationalError:
warn("Something's wrong with the SQL database. Check mysql-rfam-public.ebi.ac.uk status and try again later. Not printing statistics.")
return {}
def download_Rfam_sequences(rfam_acc):
""" Downloads the unaligned sequences known related to a given RNA family.
Actually gets a FASTA archive from the public Rfam FTP. Does not download if already there."""
print(f"\t\t> Download {rfam_acc}.fa.gz from Rfam...", end='', flush=True)
if not path.isfile(path_to_seq_data + f"rfam_sequences/fasta/{rfam_acc}.fa.gz"):
try:
......@@ -679,8 +835,13 @@ def download_Rfam_sequences(rfam_acc):
print(f"\t{validsymb}\t(already there)", flush=True)
def download_BGSU_NR_list():
# download latest BGSU non-redundant list
""" Downloads a list of RNA 3D structures proposed by Bowling Green State University RNA research group.
Does not remove structural redundancy. Resolution threshold used is 4 Angströms.
"""
print("> Fetching latest NR list from BGSU website...", end='', flush=True)
# Download latest BGSU non-redundant list
try:
s = requests.get("http://rna.bgsu.edu/rna3dhub/nrlist/download/current/4.0A/csv").content
nr = open(path_to_3D_data + "latest_nr_list.csv", 'w')
......@@ -690,7 +851,7 @@ def download_BGSU_NR_list():
except:
warn("Error downloading NR list !\t", error=True)
# try to read previous file
# Try to read previous file
if path.isfile(path_to_3D_data + "latest_nr_list.csv"):
print("\t> Use of the previous version.\t", end = "", flush=True)
else:
......@@ -705,46 +866,68 @@ def download_BGSU_NR_list():
for code in full_structures_list:
codes = code.replace('+',',').split(',')
for c in codes:
# Convert every PDB code into a Chain object
all_chains.append(Chain(c))
# The beginning of an adventure.
return all_chains
def build_chain(c, rfam, pdb_start, pdb_end):
""" Additionally adds all the desired information to a Chain object.
"""
# Download the whole mmCIF file containing the chain we are interested in
c.download_3D()
# If no problems, extract the portion we want
if not c.delete_me:
c.extract_portion(c.chain_label, pdb_start, pdb_end)
# If no problems, map it to an Rfam family, and annotate it with DSSR
if not c.delete_me:
c.set_rfam(rfam)
c.extract_3D_data()
# If there were newly discovered problems, add this chain to the known issues
if c.delete_me and c.chain_label not in known_issues:
warn(f"Adding {c.chain_label} to known issues.\t\t")
f = open(path_to_3D_data + "known_issues.txt", 'a')
f.write(c.chain_label + '\n')
f.close()
# The Chain object is ready
return c
def cm_realign(rfam_acc, chains, label):
""" Runs multiple sequence alignements by RNA family.
It aligns the Rfam hits from a RNA family with the sequences from the list of chains.
Rfam covariance models are used with Infernal tools, except for rRNAs.
cmalign requires too much RAM for them, so we use SINA, a specifically designed tool for rRNAs.
"""
# If the computation was already done before, do not recompute.
if path.isfile(path_to_seq_data + f"realigned/{rfam_acc}++.afa"):
print(f"\t> {label} completed \t{validsymb}\t(already done)", flush=True)
return
# Preparing job folder
if not os.access(path_to_seq_data + "realigned/", os.F_OK):
os.makedirs(path_to_seq_data + "realigned/")
# Reading Gzipped FASTA file and writing DNA FASTA file
if not path.isfile(path_to_seq_data + f"realigned/{rfam_acc}++.fa"):
print("\t> Extracting sequences...", flush=True)
# Prepare a FASTA file containing Rfamseq hits for that family + our chains sequences
f = open(path_to_seq_data + f"realigned/{rfam_acc}++.fa", "w")
# Read the FASTA archive of Rfamseq hits, and add sequences to the file
with gzip.open(path_to_seq_data + f"rfam_sequences/fasta/{rfam_acc}.fa.gz", 'rt') as gz:
ids = []
for record in SeqIO.parse(gz, "fasta"):
if record.id not in ids:
f.write(">"+record.description+'\n'+str(record.seq)+'\n')
ids.append(record.id)
# Add the chains sequences to the file
for c in chains:
seq_str = c.seq.replace('U','T').replace('-','') # We align as DNA
seq_str = seq_str.replace('P','U') # Replace pseudo-uridines by uridines. We lose information here :'(
seq_str = c.seq.replace('U','T').replace('-','').replace('P','U') # We align as DNA, and we replace pseudo-uridines by uridines.
f.write(f"> {str(c)}\n"+seq_str+'\n')
f.close()
......@@ -821,34 +1004,52 @@ def cm_realign(rfam_acc, chains, label):
return 0
def summarize_position(col):
# this function counts the number of nucleotides at a given position, given a "column" from a MSA.
chars = ''.join(set(col))
""" Counts the number of nucleotides at a given position, given a "column" from a MSA.
"""
# Count the different chars in the column
counts = { 'A': col.count('A') + col.count('a'), # caps are nt in conformity with the consensus (?)
'C':col.count('C') + col.count('c'),
'G':col.count('G') + col.count('g'),
'U':col.count('U') + col.count('u'),
'-':col.count('-'), '.':col.count('.')}
# Compute how many are regular ACGU and how many are not
known_chars_count = 0
chars = ''.join(set(col))
for char in chars:
if char in "ACGU":
known_chars_count += counts[char]
elif char not in "-.acgu":
counts[char] = col.count(char)
N = len(col) - counts['-'] - counts['.'] # number of ungapped residues
if N:
if N: # prevent division by zero if the column is only gaps
return ( counts['A']/N, counts['C']/N, counts['G']/N, counts['U']/N, (N - known_chars_count)/N) # other residues, or consensus (N, K, Y...)
else:
return (0, 0, 0, 0, 0)
def alignment_nt_stats(f) :
def alignment_nt_stats(f):
""" Computes Position-Specific-Scoring-Matrices given the multiple sequence alignment of the RNA family.
Also saves every chain of the family to file.
"""
# Get a worker number to position the progress bar
global idxQueue
thr_idx = idxQueue.get()
# get the chains of this family
list_of_chains = rfam_acc_to_download[f]
chains_ids = [ str(c) for c in list_of_chains ]
thr_idx = idxQueue.get()
# Open the alignment
align = AlignIO.read(path_to_seq_data + f"realigned/{f}++.afa", "fasta")
alilen = align.get_alignment_length()
try:
align = AlignIO.read(path_to_seq_data + f"realigned/{f}++.afa", "fasta")
alilen = align.get_alignment_length()
except:
warn(f,"alignment is wrong. Recompute it and retry.", error=True)
exit(1)
# Compute statistics per column
pbar = tqdm(iterable=range(alilen), position=thr_idx+1, desc=f"Worker {thr_idx+1}: {f}", leave=False)
......@@ -856,8 +1057,9 @@ def alignment_nt_stats(f) :
pbar.close()
frequencies = np.array(results).T
# For each sequence, find the right chain and save the PSSMs inside.
for s in align:
if not '[' in s.id: # this is a Rfamseq entry, not PDB
if not '[' in s.id: # this is a Rfamseq entry, not a 3D chain
continue
# get the right 3D chain:
......@@ -869,7 +1071,7 @@ def alignment_nt_stats(f) :
j = 0
#warn_gaps = False
while i<c.full_length and j<alilen:
# here we try to map c.seq (the sequence of the 3D chain, including gaps when residues are missing),
# Here we try to map c.seq (the sequence of the 3D chain, including gaps when residues are missing),
# with s.seq, the sequence aligned in the MSA, containing any of ACGUacgu and two types of gaps, - and .
if c.seq[i] == s.seq[j].upper(): # alignment and sequence correspond (incl. gaps)
......@@ -944,21 +1146,27 @@ def alignment_nt_stats(f) :
file.write(','.join(line)+'\n')
file.close()
idxQueue.put(thr_idx) # replace the thread index
idxQueue.put(thr_idx) # replace the thread index in the queue
return 0
if __name__ == "__main__":
print("Main process running. (PID", os.getpid(), ")")
# temporary, for debugging:
if os.path.exists(path_to_3D_data + "known_issues.txt"):
subprocess.run(["rm", path_to_3D_data + "known_issues.txt"])
# # temporary, for debugging: start from zero knowledge
# if os.path.exists(path_to_3D_data + "known_issues.txt"):
# subprocess.run(["rm", path_to_3D_data + "known_issues.txt"])
# ===========================================================================
# List 3D chains with available Rfam mapping
# ===========================================================================
# List all 3D RNA chains below 4Ang resolution
all_chains = download_BGSU_NR_list()
# Ask Rfam if some are mapped to Rfam families
mappings = download_Rfam_PDB_mappings()
# Filter the chains with mapping
chains_with_mapping = []
for c in all_chains:
mapping = mappings.loc[ (mappings.pdb_id == c.pdb_id) & (mappings.chain == c.pdb_chain_id) ]
......@@ -970,7 +1178,9 @@ if __name__ == "__main__":
# Download 3D structures, extract the desired chain portions,
# and extract their informations
# ===========================================================================
print("> Building download list...", flush=True)
# Check for a list of known problems:
known_issues = []
if path.isfile(path_to_3D_data + "known_issues.txt"):
......@@ -981,33 +1191,45 @@ if __name__ == "__main__":
for x in known_issues:
print("\t ", x)
# Download the corresponding CIF files and extract the mapped portions
mmcif_parser = MMCIFParser()
joblist = []
for c in chains_with_mapping:
# read mappings information
mapping = mappings.loc[ (mappings.pdb_id == c.pdb_id) & (mappings.chain == c.pdb_chain_id) ]
pdb_start = str(mapping.pdb_start.values[0])
pdb_end = str(mapping.pdb_end.values[0])
# build the chain
# Add a job to build the chain to the list
c.chain_label = f"{c.pdb_id}_{str(c.pdb_model)}_{c.pdb_chain_id}_{pdb_start}-{pdb_end}"
if c.chain_label not in known_issues:
joblist.append(Job(function=build_chain, args=[c, mapping.rfam_acc.values[0], pdb_start, pdb_end]))
joblist.append(Job(function=build_chain, # Apply function build_chain to every c.chain_label
args=[c, mapping.rfam_acc.values[0], pdb_start, pdb_end]))
if not path.isdir(path_to_3D_data + "rna_mapped_to_Rfam"):
os.makedirs(path_to_3D_data + "rna_mapped_to_Rfam")
# Prepare the results folders
if not path.isdir(path_to_3D_data + "RNAcifs"):
os.makedirs(path_to_3D_data + "RNAcifs")
os.makedirs(path_to_3D_data + "RNAcifs") # for the whole structures
if not path.isdir(path_to_3D_data + "rna_mapped_to_Rfam"):
os.makedirs(path_to_3D_data + "rna_mapped_to_Rfam") # for the portions mapped to Rfam
if not os.isdir(path_to_3D_data+"pseudotorsions/"):
os.makedirs(path_to_3D_data+"pseudotorsions/") # for the annotations by DSSR
# Run the builds and extractions
results = execute_joblist(joblist)[1]
# Remove the chains whose parsing resulted in errors
loaded_chains = [ c for c in results if not c.delete_me ]
print(f"> Loaded {len(loaded_chains)} RNA chains ({len(chains_with_mapping) - len(loaded_chains)} errors).")
# ===========================================================================
# Download RNA sequences of the corresponding Rfam families
# ===========================================================================
# Preparing a results folder
if not os.access(path_to_seq_data + "realigned/", os.F_OK):
os.makedirs(path_to_seq_data + "realigned/")
# Get the list of Rfam families found
rfam_acc_to_download = {}
for c in loaded_chains:
......@@ -1017,60 +1239,69 @@ if __name__ == "__main__":
rfam_acc_to_download[c.rfam_fam].append(c)
print(f"> Identified {len(rfam_acc_to_download.keys())} families to download and re-align with the crystals' sequences:")
# Download the sequences from these families
# Download the covariance models for all families
download_Rfam_cm()
# Ask the SQL server how much we have to download for each family
fam_stats = download_Rfam_family_stats(rfam_acc_to_download.keys())
if len(fam_stats.keys()):
fam_list = sorted(rfam_acc_to_download.keys())
if len(fam_stats.keys()): # 'if' protected, for the case the server is down, fam_stats is empty
# save the statistics to CSV file
n_pdb = [ len(rfam_acc_to_download[f]) for f in fam_stats["rfam_acc"] ]
fam_stats["n_pdb_seqs"] = n_pdb
fam_stats["total_seqs"] = fam_stats["n_seq"] + fam_stats["n_pdb_seqs"]
fam_stats.to_csv(path_to_seq_data + "realigned/statistics.csv")
for f in sorted(rfam_acc_to_download.keys()):
if len(fam_stats.keys()):
# print the stats
for f in fam_list:
line = fam_stats[fam_stats["rfam_acc"]==f]
print(f"\t> {f}: {line.n_seq.values[0]} Rfam hits + {line.n_pdb_seqs.values[0]} PDB sequences to realign")
# Download the sequences
for f in fam_list:
download_Rfam_sequences(f)
# ==========================================================================================
# Realign sequences from 3D chains to Rfam's identified hits (--> extended full alignement)
# ==========================================================================================
# Build job list
# Prepare the job list
fulljoblist = []
for f in sorted(rfam_acc_to_download.keys()):
for f in fam_list:
label = f"Realign {f} + {len(rfam_acc_to_download[f])} chains"
fulljoblist.append(Job(function=cm_realign, args=[f, rfam_acc_to_download[f], label], how_many_in_parallel=1, priority=1, label=label))
execute_joblist(fulljoblist, printstats=True)
fulljoblist.append( Job( function=cm_realign, args=[f, rfam_acc_to_download[f], label], # Apply cm_realign to each RNA family
how_many_in_parallel=1, label=label)) # the function already uses all CPUs so launch them one by one
# Execute the jobs
execute_joblist(fulljoblist, printstats=True) # printstats=True will show a summary of time/memory usage of the jobs
# ==========================================================================================
# Now compute statistics on base variants at each position of every 3D chain
# ==========================================================================================
print("Computing nucleotide frequencies in alignments...")
# Prepare a results folder
if not path.isdir(path_to_3D_data + "datapoints/"):
os.makedirs(path_to_3D_data + "datapoints/")
print("Computing nucleotide frequencies in alignments...")
families = sorted([f for f in rfam_acc_to_download.keys() ])
# Build job list
thr_idx_mgr = Manager()
idxQueue = thr_idx_mgr.Queue()
for i in range(read_cpu_number()):
idxQueue.put(i)
# Prepare the architecture of a shiny multi-progress-bars design
thr_idx_mgr = Manager() # Push the number of workers to a queue.
idxQueue = thr_idx_mgr.Queue() # ... Then each Pool worker will
for i in range(read_cpu_number()): # ... pick a number from the queue when starting the computation for one family,
idxQueue.put(i) # ... and replace it when the computation has ended so it could be picked up later.
# freeze_support()
# r = process_map(alignment_nt_stats, families, max_workers=read_cpu_number())
# Start a process pool to dispatch the RNA families over multiple CPUs
p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),), processes=read_cpu_number())
fam_pbar = tqdm(total=len(families), desc="RNA families", position=0, leave=True)
for i, _ in enumerate(p.imap_unordered(alignment_nt_stats, families)):
fam_pbar.update(1)
fam_pbar = tqdm(total=len(fam_list), desc="RNA families", position=0, leave=True)
for i, _ in enumerate(p.imap_unordered(alignment_nt_stats, fam_list)): # Apply alignment_nt_stats to each RNA family
fam_pbar.update(1) # Everytime the iteration finishes on a family, update the global progress bar over the RNA families
fam_pbar.close()
p.close()
p.join()
# ==========================================================================================
# Do a brief statistics summary of the produced dataset
# ==========================================================================================
#TODO: compute nt frequencies, chain lengths, angle clusters
print("Completed.") # This part of the code is supposed to release some serotonin in the modeller's brain
print("Completed.")
# # so i can sleep for the end of the night
# subprocess.run(["shutdown","now"])
\ No newline at end of file
......
#!/usr/bin/python3.8
import os
import numpy as np
import pandas as pd
import scipy.stats as st
import matplotlib.pyplot as plt
import matplotlib.patches as ptch
from mpl_toolkits.mplot3d import axes3d
from matplotlib import cm
from tqdm import tqdm
if os.path.isdir("/home/ubuntu/"): # this is the IFB-core cloud
path_to_3D_data = "/mnt/Data/RNA/3D/"
path_to_seq_data = "/mnt/Data/RNA/sequences/"
elif os.path.isdir("/home/persalteas"): # this is my personal workstation
path_to_3D_data = "/home/persalteas/Data/RNA/3D/"
path_to_seq_data = "/home/persalteas/Data/RNA/sequences/"
elif os.path.isdir("/home/lbecquey"): # this is the IBISC server
path_to_3D_data = "/home/lbecquey/Data/RNA/3D/"
path_to_seq_data = "/home/lbecquey/Data/RNA/sequences/"
elif os.path.isdir("/nhome/siniac/lbecquey"): # this is the office PC
path_to_3D_data = "/nhome/siniac/lbecquey/Data/RNA/3D/"
path_to_seq_data = "/nhome/siniac/lbecquey/Data/RNA/sequences/"
else:
print("I don't know that machine... I'm shy, maybe you should introduce yourself ?")
exit(1)
if __name__ == "__main__":
#TODO: compute nt frequencies, chain lengths
print("loading CSV files...")
rna_points = []
all_etas = []
all_thetas = []
for csvfile in tqdm(os.listdir(path_to_3D_data + "pseudotorsions")):
df = pd.read_csv(path_to_3D_data + "pseudotorsions/" + csvfile).drop('Unnamed: 0', axis=1)
all_etas += list(df['eta'].values)
all_thetas += list(df['theta'].values)
rna_points.append(df)
print("combining etas and thetas...")
# increase all the angles by 180°
alldata = [ ((e+360)%360-180, (t+360)%360-180)
for e, t in zip(all_etas, all_thetas)
if ('nan' not in str((e,t)))
and not(e<-150 and t<-110) and not (e>160 and t<-110) ]
print(len(alldata), "couples of nts found.")
x = np.array([ p[0] for p in alldata ])
y = np.array([ p[1] for p in alldata ])
xmin, xmax = min(x), max(x)
ymin, ymax = min(y), max(y)
xx, yy = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
positions = np.vstack([xx.ravel(), yy.ravel()])
values = np.vstack([x, y])
kernel = st.gaussian_kde(values)
f = np.reshape(kernel(positions).T, xx.shape)
# histogram :
fig, axs = plt.subplots(1,3, figsize=(18, 6))
ax = fig.add_subplot(131)
plt.axhline(y=0, alpha=0.5, color='black')
plt.axvline(x=0, alpha=0.5, color='black')
plt.scatter(x, y, s=1, alpha=0.1)
plt.contourf(xx, yy, f, cmap=cm.BuPu, alpha=0.5)
ax.set_xlabel("$\\eta'=C_1'^{i-1}-P^i-C_1'^i-P^{i+1}$")
ax.set_ylabel("$\\theta'=P^i-C_1'^i-P^{i+1}-C_1'^{i+1}$")
ax.add_patch(ptch.Rectangle((-20,0),50,70, linewidth=1, edgecolor='r', facecolor='#ff000080'))
ax = fig.add_subplot(132, projection='3d')
ax.plot_surface(xx, yy, f, cmap=cm.coolwarm, linewidth=0, antialiased=False)
ax.set_title("\"Wadley plot\"\n$\\eta'$, $\\theta'$ pseudotorsions in 3D RNA structures\n(Massive peak removed in the red zone, = double helices)")
ax.set_xlabel("$\\eta'=C_1'^{i-1}-P^i-C_1'^i-P^{i+1}$")
ax.set_ylabel("$\\theta'=P^i-C_1'^i-P^{i+1}-C_1'^{i+1}$")
ax = fig.add_subplot(133, projection='3d')
hist, xedges, yedges = np.histogram2d(x, y, bins=300, range=[[xmin, xmax], [ymin, ymax]])
xpos, ypos = np.meshgrid(xedges[:-1], yedges[:-1], indexing="ij")
ax.bar3d(xpos.ravel(), ypos.ravel(), 0, 0.5, 0.5, hist.ravel(), zsort='average')
ax.set_xlabel("$\\eta'=C_1'^{i-1}-P^i-C_1'^i-P^{i+1}$")
ax.set_ylabel("$\\theta'=P^i-C_1'^i-P^{i+1}-C_1'^{i+1}$")
plt.savefig("results/clusters_rot180.png")
plt.show()