Louis BECQUEY

Multithreaded load on Desc files (indev)

......@@ -52,7 +52,8 @@
"optional": "cpp",
"string_view": "cpp",
"cstdarg": "cpp",
"iomanip": "cpp"
"iomanip": "cpp",
"string": "cpp"
},
"python.pythonPath": "/usr/bin/python3",
"editor.formatOnSave": true,
......
#include "Motif.h"
#include "Pool.h"
#include <boost/algorithm/string.hpp>
#include <boost/filesystem.hpp>
#include <iostream>
#include <regex>
#include <sstream>
#include <thread>
using namespace boost::filesystem;
using namespace std;
......@@ -31,17 +33,22 @@ Motif::Motif(const vector<Component>& v, string PDB) : comp(v), PDBID(PDB)
source_ = RNA3DMOTIF;
}
vector<Motif> Motif::build_from_desc(const string& descfile, string rna)
void Motif::build_from_desc(args_of_parallel_func arg_struct)
{
std::ifstream motif;
string line;
string seq;
vector<string> component_sequences;
vector<string> bases;
int last;
vector<Motif> results;
char c = 'a';
char* prev = &c;
string descfile = arg_struct.descfile;
string rna = arg_struct.rna;
vector<Motif>& final_results = arg_struct.final_results;
mutex& posInsertionSites_access = arg_struct.posInsertionSites_mutex;
std::ifstream motif;
vector<vector<Component>> vresults;
string line;
string seq;
vector<string> component_sequences;
vector<string> bases;
int last;
char c = 'a';
char* prev = &c;
motif = std::ifstream(descfile);
getline(motif, line); // ignore "id: number"
......@@ -76,14 +83,100 @@ vector<Motif> Motif::build_from_desc(const string& descfile, string rna)
component_sequences.push_back(seq);
// Now component_sequences is a vector of sequences like {AGCGC, CGU..GUUU}
// We need to search for the different positions where to insert the first component
vector<vector<Component>> vresults = find_next_ones_in(rna, 0, component_sequences);
// identify components of length 1 or 2 to extend them to length 3
vector<uint> comp_of_size_1;
vector<uint> comp_of_size_2;
for (uint p = 0; p < component_sequences.size(); ++p) {
if (component_sequences[p].length() == 1) comp_of_size_1.push_back(p);
if (component_sequences[p].length() == 2) comp_of_size_2.push_back(p);
}
if (comp_of_size_1.size() or comp_of_size_2.size()) {
vector<vector<string>> motif_variants; // Will contain several component_sequences vectors according to the size where you extend too short components
component_sequences.clear(); // rebuild from scratch
motif_variants.push_back(component_sequences);
uint actual_comp = 0;
seq = "";
last = stoi(bases[1].substr(0, bases[1].find('_')));
for (vector<string>::iterator b = bases.begin() + 1; b < bases.end() - 1; b++) {
int pos = stoi(b->substr(0, b->find('_')));
char nt = b->substr(b->find('_') + 1, 1).back();
if (actual_comp == comp_of_size_1[0]) // we are on the first component of size 1
{
string seq1 = "";
seq1 += nt;
seq1 += "..";
string seq2 = ".";
seq2 += nt;
seq2 += ".";
string seq3 = "..";
seq3 += nt;
uint end = motif_variants.size(); // before to add the new ones
for (uint u = 0; u < end; ++u) {
motif_variants.push_back(motif_variants[u]); // copy 1 for seq2
motif_variants.back().push_back(seq2);
motif_variants.push_back(motif_variants[u]); // copy 2 for seq3
motif_variants.back().push_back(seq3);
motif_variants[u].push_back(seq1);
}
seq = "";
actual_comp++;
comp_of_size_1.erase(comp_of_size_1.begin()); // the first element has been processed, remove it
} else if (actual_comp == comp_of_size_2[0]) { // we are on the first component of size 2
string seq1 = "";
seq1 += nt;
seq1 += ".";
string seq2 = ".";
seq2 += nt;
uint end = motif_variants.size(); // before to add the new one
for (uint u = 0; u < end; ++u) {
motif_variants.push_back(motif_variants[u]); // copy 1 for seq2
motif_variants.back().push_back(seq2);
motif_variants[u].push_back(seq1);
}
seq = "";
b++; // skip the next nucleotide
actual_comp++;
comp_of_size_2.erase(comp_of_size_1.begin()); // the first element has been processed, remove it
} else { // we are on a longer component
if (pos - last > 5) { // finish this component and start a new one
actual_comp++;
for (auto c_s : motif_variants) c_s.push_back(seq);
seq = "";
} else if (pos - last == 2) {
seq += '.';
} else if (pos - last == 3) {
seq += "..";
} else if (pos - last == 4) {
seq += "...";
} else if (pos - last == 5) {
seq += "....";
}
seq += nt;
}
last = pos;
}
for (auto c_s : motif_variants) c_s.push_back(seq); // pushing the last one after iterating over the bases
// Now create proper motifs
// We need to search for the different positions where to insert the first component
for (auto c_s : motif_variants) {
vector<vector<Component>> new_results = find_next_ones_in(rna, 0, c_s);
vresults.insert(vresults.end(), new_results.begin(), new_results.end());
}
} else {
// No multiple motif variants : we serach in a single vector component_sequences
// We need to search for the different positions where to insert the first component
vresults = find_next_ones_in(rna, 0, component_sequences);
}
// Now create proper motifs with Motif class
for (vector<Component>& v : vresults) {
results.push_back(Motif(v, path(descfile).stem().string()));
unique_lock<mutex> lock(posInsertionSites_access);
final_results.push_back(Motif(v, path(descfile).stem().string()));
lock.unlock();
}
return results;
}
void Motif::load_from_csv(string csv_line)
......@@ -123,32 +216,29 @@ vector<vector<Component>> Motif::find_next_ones_in(string rna, uint offset, vect
vector<vector<Component>> results;
vector<vector<Component>> next_ones;
vector<string> next_seqs;
regex c(vc[0]);
regex c(vc[0]);
// cout << "\t\t>Searching " << vc[0] << " in " << rna << endl;
if (vc.size() > 1) {
if (regex_search(rna, c)){
if (regex_search(rna, c)) {
if (vc.size() > 2)
next_seqs = vector<string>(&vc[1], &vc[vc.size() - 1]);
else
next_seqs = vector<string>(1, vc.back());
next_seqs = vector<string>(1, vc.back());
// Pour chacun des matches
for(sregex_iterator i = sregex_iterator(rna.begin(), rna.end(), c); i != sregex_iterator(); ++i )
{
smatch match = *i;
pos.first = match.position() + offset;
pos.second = pos.first + match.length() - 1;
for (sregex_iterator i = sregex_iterator(rna.begin(), rna.end(), c); i != sregex_iterator(); ++i) {
smatch match = *i;
pos.first = match.position() + offset;
pos.second = pos.first + match.length() - 1;
// cout << "\t\t>Inserting " << vc[0] << " in [" << pos.first << ',' << pos.second << "]" << endl;
if (pos.second - offset + 5 >= rna.length())
{
if (pos.second - offset + 5 >= rna.length()) {
// cout << "\t\t... but we cannot place the next components : Ignored." << endl;
continue;
}
next_ones = find_next_ones_in(rna.substr(pos.second - offset + 5), pos.second + 5, next_seqs);
if (!next_ones.size())
{
next_ones = find_next_ones_in(rna.substr(pos.second - offset + 5), pos.second + 5, next_seqs);
if (!next_ones.size()) {
// cout << "\t\t... but we cannot place the next components : Ignored." << endl;
continue;
}
......@@ -164,13 +254,12 @@ vector<vector<Component>> Motif::find_next_ones_in(string rna, uint offset, vect
}
}
} else {
if (regex_search(rna, c)){
if (regex_search(rna, c)) {
// Pour chacun des matches
for(sregex_iterator i = sregex_iterator(rna.begin(), rna.end(), c); i != sregex_iterator(); ++i )
{
for (sregex_iterator i = sregex_iterator(rna.begin(), rna.end(), c); i != sregex_iterator(); ++i) {
smatch match = *i;
pos.first = match.position() + offset;
pos.second = pos.first + match.length() - 1;
pos.first = match.position() + offset;
pos.second = pos.first + match.length() - 1;
// cout << "\t\t>Inserting " << vc[0] << " in [" << pos.first << ',' << pos.second << "]" << endl;
// Combiner le match et la combinaison suivante
vector<Component> r;
......@@ -184,7 +273,7 @@ vector<vector<Component>> Motif::find_next_ones_in(string rna, uint offset, vect
char Motif::is_valid_DESC(const string& descfile)
{
// /!\ returns 0 iff no errors
// /!\ returns 0 iff no errors
std::ifstream motif;
string line;
......@@ -205,35 +294,38 @@ char Motif::is_valid_DESC(const string& descfile)
char nt = b->substr(b->find('_') + 1, 1).back();
int pos = stoi(b->substr(0, b->find('_')));
if (string(1,nt).find_first_not_of("ACGU") != string::npos) return nt;
if (string(1, nt).find_first_not_of("ACGU") != string::npos) return nt;
if (pos <= 0) return '-';
}
while (getline(motif, line))
{
uint slash = line.find('/');
string interaction(line.substr(slash-1, 3));
while (getline(motif, line)) {
uint slash = line.find('/');
string interaction(line.substr(slash - 1, 3));
string b1 = line.substr(line.find('(')+1, 8); // The first base (position_nucleotide)
string temp = line.substr(slash+1);
string b2 = temp.substr(temp.find('(')+1, 8); // The second base (position_nucleotide)
string b1 = line.substr(line.find('(') + 1, 8); // The first base (position_nucleotide)
string temp = line.substr(slash + 1);
string b2 = temp.substr(temp.find('(') + 1, 8); // The second base (position_nucleotide)
b1.erase(remove(b1.begin(), b1.end(), ' '), b1.end());
b2.erase(remove(b2.begin(), b2.end(), ' '), b2.end());
int p1 = stoi(b1.substr(0,b1.find('_')));
int p2 = stoi(b2.substr(0,b2.find('_')));
if ((p2-p1 != 1) and !interaction.compare("C/C")) return 'b';
if ((p2-p1 <4) and (!interaction.compare("+/+") or !interaction.compare("-/-"))) return 'l';
int p1 = stoi(b1.substr(0, b1.find('_')));
int p2 = stoi(b2.substr(0, b2.find('_')));
if ((p2 - p1 != 1) and !interaction.compare("C/C")) return 'b';
if ((p2 - p1 < 4) and (!interaction.compare("+/+") or !interaction.compare("-/-"))) return 'l';
}
return 0;
}
vector<Motif> load_desc_folder(const string& path, const string& rna, bool verbose)
{
vector<Motif> posInsertionSites;
int errors = 0;
int accepted = 0;
int inserted = 0;
vector<Motif> posInsertionSites;
mutex posInsertionSites_access;
Pool pool;
int errors = 0;
int accepted = 0;
int inserted = 0;
int num_threads = thread::hardware_concurrency();
vector<thread> thread_pool;
if (!exists(path)) {
cerr << "Hmh, i can't find that folder: " << path << endl;
......@@ -242,24 +334,18 @@ vector<Motif> load_desc_folder(const string& path, const string& rna, bool verbo
if (verbose) cout << "loading DESC motifs from " << path << "..." << endl;
}
for (int i = 0; i < num_threads; i++) thread_pool.push_back(thread(&Pool::infinite_loop_func, &pool));
char error;
for (auto it : recursive_directory_range(path)) {
for (auto it : recursive_directory_range(path)) { // Add every .desc file to the queue (iff valid)
if ((error = Motif::is_valid_DESC(it.path().string()))) {
if (verbose) {
cerr << "\t>Ignoring motif " << it.path().stem();
switch (error)
{
case '-':
cerr << ", some nucleotides have a negative number...";
break;
case 'l':
cerr << ", hairpin (terminal) loops must be at least of size 3 !";
break;
case 'b':
cerr << ", backbone link between non-consecutive residues ?";
break;
default:
cerr << ", use of an unknown nucleotide " << error;
switch (error) {
case '-': cerr << ", some nucleotides have a negative number..."; break;
case 'l': cerr << ", hairpin (terminal) loops must be at least of size 3 !"; break;
case 'b': cerr << ", backbone link between non-consecutive residues ?"; break;
default: cerr << ", use of an unknown nucleotide " << error;
}
cerr << endl;
}
......@@ -268,13 +354,15 @@ vector<Motif> load_desc_folder(const string& path, const string& rna, bool verbo
}
accepted++;
if (is_desc_insertible(it.path().string(), rna, verbose)) {
args_of_parallel_func args(it.path().string(), rna, posInsertionSites, posInsertionSites_access);
inserted++;
vector<Motif> m = Motif::build_from_desc(it.path().string(), rna);
if (verbose) cout << m.size() << " times" << endl;
for (Motif& mot : m) posInsertionSites.push_back(mot);
pool.push(bind(Motif::build_from_desc, args));
}
}
if (verbose) cout << "Inserted " << inserted << " motifs on " << accepted+errors << " (" << errors << " ignored motifs)" << endl;
pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++) thread_pool.at(i).join();
if (verbose)
cout << "Inserted " << inserted << " motifs on " << accepted + errors << " (" << errors << " ignored motifs)" << endl;
return posInsertionSites;
}
......
#ifndef MOTIF_H_
#define MOTIF_H_
#include <mutex>
#include <string>
#include <vector>
......@@ -8,6 +9,16 @@ using std::pair;
using std::string;
using std::vector;
class Motif; // forward declaration
typedef struct args_ {
const string& descfile;
string rna;
vector<Motif>& final_results;
std::mutex& posInsertionSites_mutex;
args_(const string& descfile_, string rna_, vector<Motif>& vector_, std::mutex& mutex_) : descfile(descfile_), rna(rna_), final_results(vector_), posInsertionSites_mutex(mutex_){}
} args_of_parallel_func;
typedef struct Comp_ {
pair<uint, uint> pos;
size_t k;
......@@ -25,15 +36,15 @@ class Motif
public:
Motif();
Motif(const vector<Component>& v, string PDB);
void load_from_csv(string csv_line);
static vector<Motif> build_from_desc(const string& descfile, string rna);
static char is_valid_DESC(const string& descfile);
string pos_string(void) const;
string get_origin(void) const;
string get_identifier(void) const;
vector<Component> comp;
double score_;
bool reversed_;
void load_from_csv(string csv_line);
static void build_from_desc(args_of_parallel_func arg_struct);
static char is_valid_DESC(const string& descfile);
string pos_string(void) const;
string get_origin(void) const;
string get_identifier(void) const;
vector<Component> comp;
double score_;
bool reversed_;
private:
static vector<vector<Component>> find_next_ones_in(string rna, uint offset, vector<string> vc);
......
#include "Pool.h"
Pool::Pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Pool::~Pool()
{
}
void Pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}
\ No newline at end of file
#pragma once
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
class Pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Pool();
~Pool();
std::mutex& get_mutex_reference(void);
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};
inline std::mutex &Pool::get_mutex_reference(void) { return m_lock; }
class quit_worker_exception : public std::exception {};
\ No newline at end of file
......@@ -8,7 +8,6 @@
#include <iostream>
#include <iterator>
#include <string>
#include <thread>
#include <vector>
#include "MOIP.h"
......