metaproxy 1.22.1
thread_pool_observer.cpp
Go to the documentation of this file.
1/* This file is part of Metaproxy.
2 Copyright (C) Index Data
3
4Metaproxy is free software; you can redistribute it and/or modify it under
5the terms of the GNU General Public License as published by the Free
6Software Foundation; either version 2, or (at your option) any later
7version.
8
9Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10WARRANTY; without even the implied warranty of MERCHANTABILITY or
11FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12for more details.
13
14You should have received a copy of the GNU General Public License
15along with this program; if not, write to the Free Software
16Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17*/
18
19#include "config.hpp"
20
21#if HAVE_UNISTD_H
22#include <unistd.h>
23#endif
24
25#ifdef WIN32
26#include <windows.h>
27#include <winsock.h>
28#endif
29#include <boost/thread/thread.hpp>
30#include <boost/thread/mutex.hpp>
31#include <boost/thread/condition.hpp>
32#include <boost/noncopyable.hpp>
33
34#include <ctype.h>
35#include <stdio.h>
36
37#include <deque>
38#include <sstream>
39
40#include <yazpp/socket-observer.h>
41#include <yaz/log.h>
42
44#include "pipe.hpp"
45
46namespace metaproxy_1 {
55
56 class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
58 public:
59 Rep(yazpp_1::ISocketObservable *obs);
60 ~Rep();
61 private:
62 yazpp_1::ISocketObservable *m_socketObservable;
64 boost::thread_group m_thrds;
65 boost::mutex m_mutex_input_data;
66 boost::condition m_cond_input_data;
67 boost::condition m_cond_input_full;
68 boost::mutex m_mutex_output_data;
69 std::deque<IThreadPoolMsg *> m_input;
70 std::deque<IThreadPoolMsg *> m_output;
72#if BOOST_VERSION >= 105000
73 boost::thread::attributes attrs;
74#endif
75 unsigned m_no_threads;
76 unsigned m_min_threads;
77 unsigned m_max_threads;
79 };
80 const unsigned int queue_size_per_thread = 64;
81}
82
83
84
85using namespace yazpp_1;
86using namespace metaproxy_1;
87
88ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
89 : m_socketObservable(obs), m_pipe(9123)
90{
91}
92
96
101
103 yazpp_1::ISocketObservable *obs,
104 unsigned min_threads, unsigned max_threads,
105 unsigned stack_size)
106 : m_p(new Rep(obs))
107{
108 obs->addObserver(m_p->m_pipe.read_fd(), this);
109 obs->maskObserver(this, SOCKET_OBSERVE_READ);
110
111 m_p->m_stop_flag = false;
112 m_p->m_no_threads = 0;
113 m_p->m_min_threads = min_threads;
114 m_p->m_max_threads = max_threads;
115 m_p->m_waiting_threads = 0;
116 unsigned i;
117#if BOOST_VERSION >= 105000
118 if (stack_size > 0)
119 m_p->attrs.set_stack_size(stack_size);
120#else
121 if (stack_size)
122 yaz_log(YLOG_WARN, "stack_size has no effect (Requires Boost 1.50)");
123#endif
124 for (i = 0; i < min_threads; i++)
125 add_worker();
126}
127
129{
130 {
131 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
132 m_p->m_stop_flag = true;
133 m_p->m_cond_input_data.notify_all();
134 }
135 m_p->m_thrds.join_all();
136 m_p->m_socketObservable->deleteObserver(this);
137}
138
140{
141 Worker w(this);
142#if BOOST_VERSION >= 105000
143 boost::thread *x = new boost::thread(m_p->attrs, w);
144#else
145 boost::thread *x = new boost::thread(w);
146#endif
147 m_p->m_no_threads++;
148 m_p->m_thrds.add_thread(x);
149}
150
152{
153 if (event & SOCKET_OBSERVE_READ)
154 {
155 char buf[2];
156#ifdef WIN32
157 recv(m_p->m_pipe.read_fd(), buf, 1, 0);
158#else
159 ssize_t r = read(m_p->m_pipe.read_fd(), buf, 1);
160 if (r != 1)
161 {
162 if (r == (ssize_t) (-1))
163 yaz_log(YLOG_WARN|YLOG_ERRNO,
164 "ThreadPoolSocketObserver::socketNotify. read fail");
165 else
166 yaz_log(YLOG_WARN,
167 "ThreadPoolSocketObserver::socketNotify. read returned 0");
168 }
169#endif
170 while (1)
171 {
172 IThreadPoolMsg *out;
173 size_t output_size = 0;
174 {
175 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
176 if (m_p->m_output.empty()) {
177 break;
178 }
179 out = m_p->m_output.front();
180 m_p->m_output.pop_front();
181 output_size = m_p->m_output.size();
182 }
183 if (out)
184 {
185 size_t input_size = 0;
186 std::ostringstream os;
187 {
188 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
189 input_size = m_p->m_input.size();
190 }
191 os << "tbusy/total "
192 << m_p->m_no_threads - m_p->m_waiting_threads
193 << "/" << m_p->m_no_threads
194 << " queue in/out " << input_size << "/" << output_size;
195 out->result(os.str().c_str());
196 }
197 }
198 }
199}
200
202{
203 tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
204 total = m_p->m_no_threads;
205}
206
208{
209 while(1)
210 {
211 IThreadPoolMsg *in = 0;
212 {
213 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
214 m_p->m_waiting_threads++;
215 while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
216 m_p->m_cond_input_data.wait(input_lock);
217 m_p->m_waiting_threads--;
218 if (m_p->m_stop_flag)
219 break;
220
221 in = m_p->m_input.front();
222 m_p->m_input.pop_front();
223 m_p->m_cond_input_full.notify_all();
224 }
225 IThreadPoolMsg *out = in->handle();
226 {
227 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
228 m_p->m_output.push_back(out);
229#ifdef WIN32
230 send(m_p->m_pipe.write_fd(), "", 1, 0);
231#else
232 ssize_t r = write(m_p->m_pipe.write_fd(), "", 1);
233 if (r != 1)
234 {
235 if (r == (ssize_t) (-1))
236 yaz_log(YLOG_WARN|YLOG_ERRNO,
237 "ThreadPoolSocketObserver::run. write fail");
238 else
239 yaz_log(YLOG_WARN,
240 "ThreadPoolSocketObserver::run. write returned 0");
241 }
242#endif
243 }
244 }
245}
246
248{
249 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
250
251 std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
252 while (it != m_p->m_input.end())
253 {
254 if ((*it)->cleanup(info))
255 {
256 delete *it;
257 it = m_p->m_input.erase(it);
258 }
259 else
260 it++;
261 }
262}
263
265{
266 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
267 if (m_p->m_waiting_threads == 0 &&
268 m_p->m_no_threads < m_p->m_max_threads)
269 {
270 add_worker();
271 }
272 while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
273 m_p->m_cond_input_full.wait(input_lock);
274 m_p->m_input.push_back(m);
275 m_p->m_cond_input_data.notify_one();
276}
277
278/*
279 * Local variables:
280 * c-basic-offset: 4
281 * c-file-style: "Stroustrup"
282 * indent-tabs-mode: nil
283 * End:
284 * vim: shiftwidth=4 tabstop=8 expandtab
285 */
286
virtual void result(const char *info)=0
virtual IThreadPoolMsg * handle()=0
ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, unsigned min_threads, unsigned max_threads, unsigned stack_size)
void get_thread_info(int &tbusy, int &total)
void cleanup(IThreadPoolMsg *m, void *info)
const unsigned int queue_size_per_thread