Pteros  2.0
Molecular modeling library for human beings!
message_channel.h
1 
2 /*
3  *
4  * This source code is part of
5  * ******************
6  * *** Pteros ***
7  * ******************
8  * molecular modeling library
9  *
10  * Copyright (c) 2009-2013, Semen Yesylevskyy
11  *
12  * This program is free software; you can redistribute it and/or
13  * modify it under the terms of Artistic License:
14  *
15  * Please note, that Artistic License is slightly more restrictive
16  * then GPL license in terms of distributing the modified versions
17  * of this software (they should be approved first).
18  * Read http://www.opensource.org/licenses/artistic-license-2.0.php
19  * for details. Such license fits scientific software better then
20  * GPL because it prevents the distribution of bugged derivatives.
21  *
22 */
23 
24 #ifndef MESSAGE_CHANNEL_H
25 #define MESSAGE_CHANNEL_H
26 
27 #include <thread>
28 #include <mutex>
29 #include <condition_variable>
30 #include <queue>
31 #include <functional>
32 
33 template<class T>
34 class Message_channel {
35 public:
36  Message_channel(){
37  buffer_size = 10;
38  stop_requested = false;
39  }
40 
41  Message_channel(int sz){
42  buffer_size = sz;
43  stop_requested = false;
44  }
45 
46  void set_buffer_size(int sz){
47  std::lock_guard<std::mutex> lock(mutex);
48  buffer_size = sz;
49  cond.notify_all();
50  }
51 
52  void send_stop(){
53  std::lock_guard<std::mutex> lock(mutex);
54  stop_requested = true;
55  cond.notify_all();
56  }
57 
58  bool empty(){
59  std::lock_guard<std::mutex> lock(mutex);
60  return queue.empty();
61  }
62 
63  bool send(T const& data){
64  std::unique_lock<std::mutex> lock(mutex);
65 
66  // Wait until buffer will clear a bit or until stop is requested
67  cond.wait(lock, [this]{return (queue.size()<buffer_size || stop_requested);} );
68 
69  // If stop requested just do nothing
70  if(stop_requested) return false;
71 
72  queue.push(data);
73  cond.notify_all();
74  return true;
75  }
76 
77  bool recieve(T& popped_value){
78  std::unique_lock<std::mutex> lock(mutex);
79 
80  // Wait until something appears in the queue or until stop requested
81  cond.wait(lock, [this]{return (!queue.empty() || stop_requested);} );
82 
83  // If stop requested see if there is something in, if not return false
84  if(stop_requested && queue.empty()) return false;
85 
86  popped_value=queue.front();
87  queue.pop();
88  cond.notify_all();
89  return true;
90  }
91 
92  void recieve_each(const std::function<void(T&)>& callback){
93  T data;
94  while(recieve(data)){
95  callback(data);
96  }
97  while(!empty()){
98  recieve(data);
99  callback(data);
100  }
101  }
102 
103 private:
104  int buffer_size;
105  std::condition_variable cond;
106  std::mutex mutex;
107  std::queue<T> queue;
108  bool stop_requested;
109 };
110 
111 
112 #endif // MESSAGE_CHANNEL_H