Louis BECQUEY

tqdm progress bars

......@@ -18,6 +18,7 @@ from functools import partial
from os import path, makedirs
from multiprocessing import Pool, cpu_count, Manager
from time import sleep
from tqdm import tqdm
if path.isdir("/home/ubuntu/"): # this is the IFB-core cloud
path_to_3D_data = "/mnt/Data/RNA/3D/"
......@@ -267,9 +268,9 @@ class Job:
self.max_mem = -1 # not executed yet
self.label = label
if not how_many_in_parallel:
self.nthreads = cpu_count()
self.nthreads = read_cpu_number()
elif how_many_in_parallel == -1:
self.nthreads = cpu_count() - 1
self.nthreads = read_cpu_number() - 1
else:
self.nthreads = how_many_in_parallel
self.useless_bool = False
......@@ -472,6 +473,13 @@ class Monitor:
sleep(0.1)
return max_mem
def read_cpu_number():
# do not use os.cpu_count() on LXC containers
# it reads info from /sys wich is not the VM resources but the host resources.
# Read from /proc/cpuinfo instead.
p = subprocess.run(['grep', '-c', 'Intel(', '/proc/cpuinfo'], stdout=subprocess.PIPE)
return int(p.stdout.decode('utf-8')[:-1])
def warn(message, error=False):
if error:
print(f"\t> \033[31mERR: {message}\033[0m{errsymb}", flush=True)
......@@ -500,10 +508,9 @@ def execute_job(j, jobcount):
monitor.keep_watching = False
m = assistant_future.result()
elif j.func_ is not None:
print(f"[{running_stats[0]+running_stats[2]}/{jobcount}]\t{j.func_.__name__}({', '.join([str(a) for a in j.args_ if not ((type(a) == list) and len(a)>3)])})")
#print(f"[{running_stats[0]+running_stats[2]}/{jobcount}]\t{j.func_.__name__}({', '.join([str(a) for a in j.args_ if not ((type(a) == list) and len(a)>3)])})")
m = -1
monitor = Monitor(os.getpid())
......@@ -558,7 +565,7 @@ def execute_joblist(fulljoblist, printstats=False):
print("using", n, "processes:")
# execute jobs of priority i that should be processed n by n:
p = Pool(processes=n, maxtasksperchild=10)
p = Pool(processes=n)
raw_results = p.map(partial(execute_job, jobcount=jobcount), bunch)
p.close()
p.join()
......@@ -833,19 +840,22 @@ def summarize_position(col):
else:
return (0, 0, 0, 0, 0)
def alignment_nt_stats(f, list_of_chains):
print("\t>",f,"... ", flush=True)
def alignment_nt_stats(f, list_of_chains) :
global idxQueue
#print("\t>",f,"... ", flush=True)
chains_ids = [ str(c) for c in list_of_chains ]
thr_idx = idxQueue.get()
print(thr_idx, flush=True)
# Open the alignment
align = AlignIO.read(path_to_seq_data + f"realigned/{f}++.afa", "fasta")
alilen = align.get_alignment_length()
print("\t>",f,"... loaded", flush=True)
#print("\t>",f,"... loaded", flush=True)
# Compute statistics per column
results = [ summarize_position(align[:,i]) for i in range(alilen) ]
results = [ summarize_position(align[:,i]) for i in tqdm(range(alilen), position=thr_idx) ]
frequencies = np.array(results).T
print("\t>",f,"... loaded, computed", flush=True)
#print("\t>",f,"... loaded, computed", flush=True)
for s in align:
if not '[' in s.id: # this is a Rfamseq entry, not PDB
......@@ -897,8 +907,8 @@ def alignment_nt_stats(f, list_of_chains):
else:
print(f"You are never supposed to reach this. Comparing {c.chain_label} in {i} ({c.seq[i-1:i+2]}) with seq[{j}] ({s.seq[j-3:j+4]}).\n", c.seq, '\n', s.seq, sep='', flush=True)
exit(1)
if warn_gaps:
warn(f"Some gap(s) in {c.chain_label} were not re-found in the aligned sequence... Ignoring them.")
#if warn_gaps:
#warn(f"Some gap(s) in {c.chain_label} were not re-found in the aligned sequence... Ignoring them.")
# Replace masked positions by the consensus sequence:
c_seq = c.seq.split()
......@@ -934,10 +944,10 @@ def alignment_nt_stats(f, list_of_chains):
line = [str(x) for x in list(point[i,:]) ]
file.write(','.join(line)+'\n')
file.close()
print("\t\tWritten", c.chain_label, f"to file\t{validsymb}", flush=True)
#print("\t\tWritten", c.chain_label, f"to file\t{validsymb}", flush=True)
print("\t>", f, f"... loaded, computed, saved\t{validsymb}", flush=True)
return None
#print("\t>", f, f"... loaded, computed, saved\t{validsymb}", flush=True)
return 0
if __name__ == "__main__":
print("Main process running. (PID", os.getpid(), ")")
......@@ -1041,18 +1051,18 @@ if __name__ == "__main__":
if not path.isdir(path_to_3D_data + "datapoints/"):
os.makedirs(path_to_3D_data + "datapoints/")
print("Computing nucleotide frequencies in alignments...")
families = sorted([f for f in rfam_acc_to_download.keys() if f not in ["RF01960", "RF02540"]])
# pool = Pool(processes=cpu_count(), maxtasksperchild=10)
# results = pool.map(alignment_nt_stats, families)
# pool.close()
# pool.join()
# loaded_chains = list(itertools.chain.from_iterable(results))
families = sorted([f for f in rfam_acc_to_download.keys() ])
# Build job list
thr_idx_mgr = multiprocessing.Manager()
idxQueue = thr_idx_mgr.Queue()
for i in range(10):
idxQueue.put(i)
fulljoblist = []
for f in families:
label = f"Save {f} PSSMs"
list_of_chains = rfam_acc_to_download[f]
fulljoblist.append(Job(function=alignment_nt_stats, args=[f, list_of_chains, label], how_many_in_parallel=10, priority=1, label=label))
fulljoblist.append(Job(function=alignment_nt_stats, args=[f, list_of_chains], how_many_in_parallel=10, priority=1, label=label))
execute_joblist(fulljoblist, printstats=False)
print("Completed.")
......
#!/bin/bash
PROCESS_TO_KILL="RNAnet.py"
PROCESS_LIST=`ps ax | grep -Ei ${PROCESS_TO_KILL} | grep -Eiv '(grep|vi RNAnet.py)' | awk ' { print $1;}'`
KILLED=
for KILLPID in $PROCESS_LIST; do
if [ ! -z $KILLPID ];then
kill -9 $KILLPID
echo "Killed PID ${KILLPID}"
KILLED=yes
fi
done
if [ -z $KILLED ];then
echo "Didn't kill anything"
fi