metaproxy  1.21.0
thread_pool_observer.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include "config.hpp"
20 
21 #if HAVE_UNISTD_H
22 #include <unistd.h>
23 #endif
24 
25 #ifdef WIN32
26 #include <windows.h>
27 #include <winsock.h>
28 #endif
29 #include <boost/thread/thread.hpp>
30 #include <boost/thread/mutex.hpp>
31 #include <boost/thread/condition.hpp>
32 #include <boost/noncopyable.hpp>
33 
34 #include <ctype.h>
35 #include <stdio.h>
36 
37 #include <deque>
38 #include <sstream>
39 
40 #include <yazpp/socket-observer.h>
41 #include <yaz/log.h>
42 
43 #include "thread_pool_observer.hpp"
44 #include "pipe.hpp"
45 
46 namespace metaproxy_1 {
48  public:
51  void operator() (void) {
52  m_s->run(0);
53  }
54  };
55 
56  class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
58  public:
59  Rep(yazpp_1::ISocketObservable *obs);
60  ~Rep();
61  private:
62  yazpp_1::ISocketObservable *m_socketObservable;
64  boost::thread_group m_thrds;
65  boost::mutex m_mutex_input_data;
66  boost::condition m_cond_input_data;
67  boost::condition m_cond_input_full;
68  boost::mutex m_mutex_output_data;
69  std::deque<IThreadPoolMsg *> m_input;
70  std::deque<IThreadPoolMsg *> m_output;
72 #if BOOST_VERSION >= 105000
73  boost::thread::attributes attrs;
74 #endif
75  unsigned m_no_threads;
76  unsigned m_min_threads;
77  unsigned m_max_threads;
79  };
80  const unsigned int queue_size_per_thread = 64;
81 }
82 
83 
84 
85 using namespace yazpp_1;
86 using namespace metaproxy_1;
87 
88 ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
89  : m_socketObservable(obs), m_pipe(9123)
90 {
91 }
92 
94 {
95 }
96 
98 {
99 
100 }
101 
103  yazpp_1::ISocketObservable *obs,
104  unsigned min_threads, unsigned max_threads,
105  unsigned stack_size)
106  : m_p(new Rep(obs))
107 {
108  obs->addObserver(m_p->m_pipe.read_fd(), this);
109  obs->maskObserver(this, SOCKET_OBSERVE_READ);
110 
111  m_p->m_stop_flag = false;
112  m_p->m_no_threads = 0;
113  m_p->m_min_threads = min_threads;
114  m_p->m_max_threads = max_threads;
115  m_p->m_waiting_threads = 0;
116  unsigned i;
117 #if BOOST_VERSION >= 105000
118  if (stack_size > 0)
119  m_p->attrs.set_stack_size(stack_size);
120 #else
121  if (stack_size)
122  yaz_log(YLOG_WARN, "stack_size has no effect (Requires Boost 1.50)");
123 #endif
124  for (i = 0; i < min_threads; i++)
125  add_worker();
126 }
127 
129 {
130  {
131  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
132  m_p->m_stop_flag = true;
133  m_p->m_cond_input_data.notify_all();
134  }
135  m_p->m_thrds.join_all();
136  m_p->m_socketObservable->deleteObserver(this);
137 }
138 
140 {
141  Worker w(this);
142 #if BOOST_VERSION >= 105000
143  boost::thread *x = new boost::thread(m_p->attrs, w);
144 #else
145  boost::thread *x = new boost::thread(w);
146 #endif
147  m_p->m_no_threads++;
148  m_p->m_thrds.add_thread(x);
149 }
150 
152 {
153  if (event & SOCKET_OBSERVE_READ)
154  {
155  char buf[2];
156 #ifdef WIN32
157  recv(m_p->m_pipe.read_fd(), buf, 1, 0);
158 #else
159  ssize_t r = read(m_p->m_pipe.read_fd(), buf, 1);
160  if (r != 1)
161  {
162  if (r == (ssize_t) (-1))
163  yaz_log(YLOG_WARN|YLOG_ERRNO,
164  "ThreadPoolSocketObserver::socketNotify. read fail");
165  else
166  yaz_log(YLOG_WARN,
167  "ThreadPoolSocketObserver::socketNotify. read returned 0");
168  }
169 #endif
170  while (1)
171  {
172  IThreadPoolMsg *out;
173  size_t output_size = 0;
174  {
175  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
176  if (m_p->m_output.empty()) {
177  break;
178  }
179  out = m_p->m_output.front();
180  m_p->m_output.pop_front();
181  output_size = m_p->m_output.size();
182  }
183  if (out)
184  {
185  size_t input_size = 0;
186  std::ostringstream os;
187  {
188  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
189  input_size = m_p->m_input.size();
190  }
191  os << "tbusy/total "
192  << m_p->m_no_threads - m_p->m_waiting_threads
193  << "/" << m_p->m_no_threads
194  << " queue in/out " << input_size << "/" << output_size;
195  out->result(os.str().c_str());
196  }
197  }
198  }
199 }
200 
201 void ThreadPoolSocketObserver::get_thread_info(int &tbusy, int &total)
202 {
203  tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
204  total = m_p->m_no_threads;
205 }
206 
208 {
209  while(1)
210  {
211  IThreadPoolMsg *in = 0;
212  {
213  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
214  m_p->m_waiting_threads++;
215  while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
216  m_p->m_cond_input_data.wait(input_lock);
217  m_p->m_waiting_threads--;
218  if (m_p->m_stop_flag)
219  break;
220 
221  in = m_p->m_input.front();
222  m_p->m_input.pop_front();
223  m_p->m_cond_input_full.notify_all();
224  }
225  IThreadPoolMsg *out = in->handle();
226  {
227  boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
228  m_p->m_output.push_back(out);
229 #ifdef WIN32
230  send(m_p->m_pipe.write_fd(), "", 1, 0);
231 #else
232  ssize_t r = write(m_p->m_pipe.write_fd(), "", 1);
233  if (r != 1)
234  {
235  if (r == (ssize_t) (-1))
236  yaz_log(YLOG_WARN|YLOG_ERRNO,
237  "ThreadPoolSocketObserver::run. write fail");
238  else
239  yaz_log(YLOG_WARN,
240  "ThreadPoolSocketObserver::run. write returned 0");
241  }
242 #endif
243  }
244  }
245 }
246 
248 {
249  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
250 
251  std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
252  while (it != m_p->m_input.end())
253  {
254  if ((*it)->cleanup(info))
255  {
256  delete *it;
257  it = m_p->m_input.erase(it);
258  }
259  else
260  it++;
261  }
262 }
263 
265 {
266  boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
267  if (m_p->m_waiting_threads == 0 &&
268  m_p->m_no_threads < m_p->m_max_threads)
269  {
270  add_worker();
271  }
272  while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
273  m_p->m_cond_input_full.wait(input_lock);
274  m_p->m_input.push_back(m);
275  m_p->m_cond_input_data.notify_one();
276 }
277 
278 /*
279  * Local variables:
280  * c-basic-offset: 4
281  * c-file-style: "Stroustrup"
282  * indent-tabs-mode: nil
283  * End:
284  * vim: shiftwidth=4 tabstop=8 expandtab
285  */
286 
virtual void result(const char *info)=0
virtual IThreadPoolMsg * handle()=0
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, unsigned min_threads, unsigned max_threads, unsigned stack_size)
void get_thread_info(int &tbusy, int &total)
void cleanup(IThreadPoolMsg *m, void *info)
const unsigned int queue_size_per_thread