Louis BECQUEY

Fixes + using esl-alimerge again

......@@ -71,11 +71,6 @@ sqlite3.enable_callback_tracebacks(True)
sqlite3.register_adapter(np.int64, lambda val: int(val)) # Tell Sqlite what to do with <class numpy.int64> objects ---> convert to int
sqlite3.register_adapter(np.float64, lambda val: float(val)) # Tell Sqlite what to do with <class numpy.float64> objects ---> convert to float
# m = Manager()
# running_stats = m.list()
# running_stats.append(0) # n_launched
# running_stats.append(0) # n_finished
# running_stats.append(0) # n_skipped
n_launched = Value('i', 0)
n_finished = Value('i', 0)
n_skipped = Value('i', 0)
......@@ -635,12 +630,24 @@ class Chain:
if nt2 in res_ids:
interacts[nt2_idx] += 1
if paired[nt2_idx] == "":
pair_type_LW[nt2_idx] = lw_pair[0] + lw_pair[2] + lw_pair[1]
pair_type_DSSR[nt2_idx] = dssr_pair[0] + dssr_pair[3] + dssr_pair[2] + dssr_pair[1]
if lw_pair != "--":
pair_type_LW[nt2_idx] = lw_pair[0] + lw_pair[2] + lw_pair[1]
else:
pair_type_LW[nt2_idx] = "--"
if dssr_pair != "--":
pair_type_DSSR[nt2_idx] = dssr_pair[0] + dssr_pair[3] + dssr_pair[2] + dssr_pair[1]
else:
pair_type_DSSR[nt2_idx] = "--"
paired[nt2_idx] = str(nt1_idx + 1)
else:
pair_type_LW[nt2_idx] += ',' + lw_pair[0] + lw_pair[2] + lw_pair[1]
pair_type_DSSR[nt2_idx] += ',' + dssr_pair[0] + dssr_pair[3] + dssr_pair[2] + dssr_pair[1]
if lw_pair != "--":
pair_type_LW[nt2_idx] += ',' + lw_pair[0] + lw_pair[2] + lw_pair[1]
else:
pair_type_LW[nt2_idx] += ",--"
if dssr_pair != "--":
pair_type_DSSR[nt2_idx] += ',' + dssr_pair[0] + dssr_pair[3] + dssr_pair[2] + dssr_pair[1]
else:
pair_type_DSSR[nt2_idx] += ",--"
paired[nt2_idx] += ',' + str(nt1_idx + 1)
# transform nt_id to shorter values
......@@ -1083,7 +1090,7 @@ class Pipeline:
self.REUSE_ALL = False
self.REDUNDANT = False
self.ALIGNOPTS = None
self.RRNAALIGNOPTS = "--mxsize 8192 --cpu 10 --maxtau 0.1"
self.RRNAALIGNOPTS = ["--mxsize", "8192", "--cpu", "10", "--maxtau", "0.1"]
self.STATSOPTS = None
self.USESINA = False
self.SELECT_ONLY = None
......@@ -1151,6 +1158,7 @@ class Pipeline:
"\n\t\t\t\t need of RAM. Should be a number between 1 and your number of CPUs. Note that portions"
"\n\t\t\t\t of the pipeline already limit themselves to 50% or 70% of that number by default.")
print("--cmalign-opts=…\t\tA string of additional options to pass to cmalign aligner, e.g. \"--nonbanded --mxsize 2048\"")
print("--cmalign-rrna-opts=…\tLike cmalign-opts, but applied for rRNA (large families, memory-heavy jobs).")
print("--archive\t\t\tCreate tar.gz archives of the datapoints text files and the alignments,"
"\n\t\t\t\t and update the link to the latest archive. ")
print("--no-logs\t\t\tDo not save per-chain logs of the numbering modifications.")
......@@ -1219,7 +1227,7 @@ class Pipeline:
elif opt == "cmalign-opts":
self.ALIGNOPTS = arg
elif opt == "cmalign-rrna-opts":
self.RRNAALIGNOPTS = arg
self.RRNAALIGNOPTS = " ".split(arg)
elif opt == "stats-opts":
self.STATSOPTS = " ".split(arg)
elif opt == "--all":
......@@ -1436,8 +1444,9 @@ class Pipeline:
args=[c, self.EXTRACT_CHAINS, self.KEEP_HETATM, retry, self.SAVELOGS]))
try:
results = execute_joblist(joblist)
except:
print("Exiting", flush=True)
except Exception as e:
warn(str(e), error=True)
print("Exiting", str(e), flush=True)
exit(1)
# If there were newly discovered problems, add this chain to the known issues
......@@ -1550,7 +1559,7 @@ class Pipeline:
align = AlignIO.read(path_to_seq_data + "realigned/" + r[0] + "++.afa", "fasta")
nb_3d_chains = len([1 for r in align if '[' in r.id])
if r[0] in SSU_set: # SSU v138.1 is used
nb_homologs = 2224740 # source: https://www.arb-silva.de/documentation/release-1381/
nb_homologs = 2224740 # source: https://www.arb-silva.de/documentation/release-1381/
nb_total_homol = nb_homologs + nb_3d_chains
elif r[0] in LSU_set: # LSU v138.1 is used
nb_homologs = 227331 # source: https://www.arb-silva.de/documentation/release-1381/
......@@ -1794,9 +1803,9 @@ def init_no_tqdm(arg1, arg2, arg3):
The children progress is followed using stdout text logs (notify(), warn(), etc)
"""
global n_launched, n_finished, n_skipped
n_launched = arg1
n_finished = arg2
n_skipped = arg3
n_launched = arg1
n_finished = arg2
n_skipped = arg3
def warn(message, error=False):
"""
......@@ -2147,7 +2156,7 @@ def execute_job(j, jobcount):
# increase the counter of running jobs
with n_launched.get_lock():
n_launched.value += 1
n_launched.value += 1
# Monitor this process
m = -1
......@@ -2208,7 +2217,8 @@ def execute_job(j, jobcount):
m = assistant_future.result()
# increase the counter of finished jobs
running_stats[1] += 1
with n_finished.get_lock():
n_finished.value += 1
# return time and memory statistics, plus the job results
t = end_time - start_time
......@@ -2223,9 +2233,12 @@ def execute_joblist(fulljoblist):
"""
# Reset counters
running_stats[0] = 0 # started
running_stats[1] = 0 # finished
running_stats[2] = 0 # failed
with n_launched.get_lock():
n_launched.value = 0
with n_skipped.get_lock():
n_skipped.value = 0
with n_finished.get_lock():
n_finished.value = 0
# Sort jobs in a tree structure, first by priority, then by CPU numbers
jobs = {}
......@@ -2276,10 +2289,6 @@ def execute_joblist(fulljoblist):
j.comp_time = round(r[0], 2) # seconds
j.max_mem = int(r[1]/1000000) # MB
results.append((j.label, r[2], j.comp_time, j.max_mem))
# Job is finished
with n_finished.get_lock():
n_finished.value += 1
# throw back the money
return results
......@@ -2672,13 +2681,17 @@ def use_infernal(rfam_acc, alignopts):
with open(path_to_seq_data + f"realigned/{rfam_acc}_new.log", 'w') as o:
p1 = subprocess.run(["cmalign", "--ifile", path_to_seq_data + f"realigned/{rfam_acc}.ins",
"--sfile", path_to_seq_data + f"realigned/{rfam_acc}.tsv",
"-o", path_to_seq_data + f"realigned/{rfam_acc}_new.stk",
"-o", new_ali_path,
path_to_seq_data + f"realigned/{rfam_acc}.cm",
path_to_seq_data + f"realigned/{rfam_acc}_new.fa"],
stdout=o, stderr=subprocess.PIPE)
if "--mxsize" in p1.stderr.decode("utf-8"):
# not enough available RAM to allocate the DP matrix
warn(f"Not enough RAM to allocate cmalign DP matrix for family {rfam_acc}. Use --sina or --cmalign-opts.", error=True)
align_errors = p1.stderr.decode("utf-8")
if len(align_errors):
if "--mxsize" in align_errors:
# not enough available RAM to allocate the DP matrix
warn(f"Not enough RAM to allocate cmalign DP matrix for family {rfam_acc}. Use --sina or --cmalign-opts.", error=True)
else:
warn(align_errors, error=True)
notify("Aligned new sequences together")
# Detect doublons and remove them
......@@ -2710,8 +2723,8 @@ def use_infernal(rfam_acc, alignopts):
os.remove(path_to_seq_data + "realigned/toremove.txt")
# And we merge the two alignments
p2 = subprocess.run(["cmalign", "--merge" "-o", path_to_seq_data + f"realigned/{rfam_acc}_merged.stk",
"--rna", path_to_seq_data + f"realigned/{rfam_acc}.cm", existing_ali_path, new_ali_path],
p2 = subprocess.run(["esl-alimerge", "-o", path_to_seq_data + f"realigned/{rfam_acc}_merged.stk",
"--rna", existing_ali_path, new_ali_path],
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
alignErrors = p1.stderr.decode('utf-8')
mergeErrors = p2.stderr.decode('utf-8')
......@@ -2730,7 +2743,7 @@ def use_infernal(rfam_acc, alignopts):
cmd = ["cmalign"]
if alignopts is not None:
cmd += " ".split(alignopts)
cmd += alignopts
cmd += ['-o', path_to_seq_data + f"realigned/{rfam_acc}++.stk",
"--ifile", path_to_seq_data + f"realigned/{rfam_acc}.ins",
"--sfile", path_to_seq_data + f"realigned/{rfam_acc}.tsv",
......@@ -3166,8 +3179,6 @@ if __name__ == "__main__":
for c in pp.loaded_chains:
work_save(c, homology=False)
print("Completed.")
exit(0)
# At this point, structure, chain and nucleotide tables of the database are up to date.
# (Modulo some statistics computed by statistics.py)
......
......@@ -35,11 +35,11 @@ nohup bash -c 'time docker run --rm -v /path/to/3D/data/folder:/3D -v /path/to/s
# Method 2 : Classical command line installation (Linux only)
You need to install the dependencies:
- DSSR, you need to register to the X3DNA forum [here](http://forum.x3dna.org/site-announcements/download-instructions/) and then download the DSSR binary [on that page](http://forum.x3dna.org/downloads/3dna-download/). Make sure to have the `x3dna-dssr` binary in your $PATH variable so that RNANet.py finds it.
- Infernal, to download at [Eddylab](http://eddylab.org/infernal/), several options are available depending on your preferences. Make sure to have the `cmalign`, `cmfetch`, `cmbuild`, `esl-alimanip`, `esl-alipid` and `esl-reformat` binaries in your $PATH variable, so that RNANet.py can find them.
- SINA, follow [these instructions](https://sina.readthedocs.io/en/latest/install.html) for example. Make sure to have the `sina` binary in your $PATH.
- DSSR 1.9.9 or newer, you need to register to the X3DNA forum [here](http://forum.x3dna.org/site-announcements/download-instructions/) and then download the DSSR binary [on that page](http://forum.x3dna.org/downloads/3dna-download/). Make sure to have the `x3dna-dssr` binary in your $PATH variable so that RNANet.py finds it.
- Infernal 1.1.4 or newer, to download at [Eddylab](http://eddylab.org/infernal/), several options are available depending on your preferences. Make sure to have the `cmalign`, `cmfetch`, `cmbuild`, `esl-alimanip`, `esl-alimerge`, `esl-alipid` and `esl-reformat` binaries in your $PATH variable, so that RNANet.py can find them.
- SINA (if you plan to use it), follow [these instructions](https://sina.readthedocs.io/en/latest/install.html) for example. Make sure to have the `sina` binary in your $PATH.
- Sqlite 3, available under the name *sqlite* in every distro's package manager,
- Python >= 3.8, (Unfortunately, python3.6 is no longer supported, because of changes in the multiprocessing and Threading packages. Untested with Python 3.7.\*)
- Python >= 3.8, (Unfortunately, python3.6 is no longer supported, because of changes in the multiprocessing and Threading packages. Untested with Python 3.7.\*).
- The following Python packages: `python3.8 -m pip install biopython matplotlib pandas psutil pymysql requests scipy setproctitle sqlalchemy tqdm`.
Then, run it from the command line, preferably using nohup if your shell will be interrupted:
......
......@@ -19,3 +19,6 @@
* Use and save Infernal alignment bounds and truncation information
* Save if a chain is a representative in BGSU list
* Annotate unstructured regions (on a nucleotide basis)
## Technical to-do list
* `cmalign --merge` is now deprecated, we use `esl-alimerge` instead. But, esl is a single-core process. We should run the merges of alignements of different families in parallel to save some time [TODO].
......
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
......@@ -26,7 +26,7 @@ from os import path
from tqdm import tqdm
from collections import Counter
from setproctitle import setproctitle
from RNAnet import Job, read_cpu_number, sql_ask_database, sql_execute, warn, notify, init_worker, trace_unhandled_exceptions
from RNAnet import Job, read_cpu_number, sql_ask_database, sql_execute, warn, notify, init_with_tqdm, trace_unhandled_exceptions
from geometric_stats import *
np.set_printoptions(threshold=sys.maxsize, linewidth=np.inf, precision=8)
......@@ -948,7 +948,11 @@ def par_distance_matrix(filelist, f, label, cm_coords, consider_all_atoms, s):
nb_gap += 1
coordinates_with_gaps.append(np.nan)
else:
coordinates_with_gaps.append(coordinates[i - nb_gap])
try:
coordinates_with_gaps.append(coordinates[i - nb_gap])
except IndexError as e:
warn(f"{filename} : {s.seq} at position {i}, we get {e}.", error=True)
exit(0)
# Build the pairwise distances
d = np.zeros((len(s.seq), len(s.seq)), dtype=np.float32)
......@@ -1055,7 +1059,7 @@ def get_avg_std_distance_matrix(f, consider_all_atoms, multithread=False):
else:
# We split the work for one family on multiple workers.
p = Pool(initializer=init_worker, initargs=(tqdm.get_lock(),), processes=nworkers)
p = Pool(initializer=init_with_tqdm, initargs=(tqdm.get_lock(),), processes=nworkers)
try:
fam_pbar = tqdm(total=len(align), desc=f"{f} {label} pair distances", position=0, unit="chain", leave=True)
# Apply work_pssm_remap to each RNA family
......@@ -1147,8 +1151,11 @@ def nt_3d_centers(cif_file, consider_all_atoms):
Some chains have no C1' (e.g. 4v7f-3), therefore, an empty result is returned.
"""
result =[]
structure = MMCIFParser().get_structure(cif_file, cif_file)
try:
structure = MMCIFParser().get_structure(cif_file, cif_file)
except Exception as e:
warn(f"{cif_file} : {e}", error=True)
return result
for model in structure:
for chain in model:
for residue in chain:
......@@ -1203,7 +1210,7 @@ def process_jobs(joblist):
Starts a Pool to run the Job() objects in joblist.
"""
tmp_nworkers = min(len(joblist), nworkers)
p = Pool(initializer=init_worker, initargs=(tqdm.get_lock(),), processes=tmp_nworkers)
p = Pool(initializer=init_with_tqdm, initargs=(tqdm.get_lock(),), processes=tmp_nworkers)
pbar = tqdm(total=len(joblist), desc="Stat jobs", position=0, unit="job", leave=True)
try:
......@@ -1345,31 +1352,31 @@ if __name__ == "__main__":
# Define the tasks
joblist = []
# # Do eta/theta plots
# if n_unmapped_chains and DO_WADLEY_ANALYSIS:
# joblist.append(Job(function=reproduce_wadley_results, args=(1, False, (1,4), res_thr)))
# joblist.append(Job(function=reproduce_wadley_results, args=(4, False, (1,4), res_thr)))
# # Do distance matrices for each family excl. LSU/SSU (will be processed later)
# if DO_AVG_DISTANCE_MATRIX:
# extracted_chains = []
# for file in os.listdir(path_to_3D_data + "rna_mapped_to_Rfam"):
# if os.path.isfile(os.path.join(path_to_3D_data + "rna_mapped_to_Rfam", file)):
# e1 = file.split('_')[0]
# e2 = file.split('_')[1]
# e3 = file.split('_')[2]
# extracted_chains.append(e1 + '[' + e2 + ']' + '-' + e3)
# for f in [ x for x in famlist if (x not in LSU_set and x not in SSU_set) ]: # Process the rRNAs later only 3 by 3
# joblist.append(Job(function=get_avg_std_distance_matrix, args=(f, True, False)))
# joblist.append(Job(function=get_avg_std_distance_matrix, args=(f, False, False)))
# # Do general family statistics
# joblist.append(Job(function=stats_len)) # Computes figures about chain lengths
# joblist.append(Job(function=stats_freq)) # updates the database (nucleotide frequencies in families)
# for f in famlist:
# joblist.append(Job(function=parallel_stats_pairs, args=(f,))) # updates the database (intra-chain basepair types within a family)
# if f not in ignored:
# joblist.append(Job(function=to_id_matrix, args=(f,))) # updates the database (identity matrices of families)
# Do eta/theta plots
if n_unmapped_chains and DO_WADLEY_ANALYSIS:
joblist.append(Job(function=reproduce_wadley_results, args=(1, False, (1,4), res_thr)))
joblist.append(Job(function=reproduce_wadley_results, args=(4, False, (1,4), res_thr)))
# Do distance matrices for each family excl. LSU/SSU (will be processed later)
if DO_AVG_DISTANCE_MATRIX:
extracted_chains = []
for file in os.listdir(path_to_3D_data + "rna_mapped_to_Rfam"):
if os.path.isfile(os.path.join(path_to_3D_data + "rna_mapped_to_Rfam", file)):
e1 = file.split('_')[0]
e2 = file.split('_')[1]
e3 = file.split('_')[2]
extracted_chains.append(e1 + '[' + e2 + ']' + '-' + e3)
for f in [ x for x in famlist if (x not in LSU_set and x not in SSU_set) ]: # Process the rRNAs later only 3 by 3
joblist.append(Job(function=get_avg_std_distance_matrix, args=(f, True, False)))
joblist.append(Job(function=get_avg_std_distance_matrix, args=(f, False, False)))
# Do general family statistics
joblist.append(Job(function=stats_len)) # Computes figures about chain lengths
joblist.append(Job(function=stats_freq)) # updates the database (nucleotide frequencies in families)
for f in famlist:
joblist.append(Job(function=parallel_stats_pairs, args=(f,))) # updates the database (intra-chain basepair types within a family)
if f not in ignored:
joblist.append(Job(function=to_id_matrix, args=(f,))) # updates the database (identity matrices of families)
# Do geometric measures
......@@ -1382,7 +1389,7 @@ if __name__ == "__main__":
joblist.append(Job(function=measure_from_structure, args=(f,), how_many_in_parallel=nworkers)) # All-atom distances
# process_jobs(joblist)
process_jobs(joblist)
# Now process the memory-heavy tasks family by family
if DO_AVG_DISTANCE_MATRIX:
......@@ -1398,11 +1405,11 @@ if __name__ == "__main__":
# finish the work after the parallel portions
# per_chain_stats() # per chain base frequencies and basepair types
# seq_idty() # identity matrices from pre-computed .npy matrices
# stats_pairs()
per_chain_stats() # per chain base frequencies and basepair types
seq_idty() # identity matrices from pre-computed .npy matrices
stats_pairs()
if n_unmapped_chains:
# general_stats()
general_stats()
os.makedirs(runDir+"/results/figures/GMM/", exist_ok=True)
os.makedirs(runDir+"/results/geometry/json/", exist_ok=True)
concat_dataframes(runDir + '/results/geometry/all-atoms/distances/', 'dist_atoms.csv')
......