29 #include <boost/thread/thread.hpp>
30 #include <boost/thread/mutex.hpp>
31 #include <boost/thread/condition.hpp>
32 #include <boost/noncopyable.hpp>
40 #include <yazpp/socket-observer.h>
59 Rep(yazpp_1::ISocketObservable *obs);
72 #if BOOST_VERSION >= 105000
73 boost::thread::attributes attrs;
85 using namespace yazpp_1;
88 ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
89 : m_socketObservable(obs), m_pipe(9123)
103 yazpp_1::ISocketObservable *obs,
104 unsigned min_threads,
unsigned max_threads,
108 obs->addObserver(
m_p->m_pipe.read_fd(),
this);
109 obs->maskObserver(
this, SOCKET_OBSERVE_READ);
111 m_p->m_stop_flag =
false;
112 m_p->m_no_threads = 0;
113 m_p->m_min_threads = min_threads;
114 m_p->m_max_threads = max_threads;
115 m_p->m_waiting_threads = 0;
117 #if BOOST_VERSION >= 105000
119 m_p->attrs.set_stack_size(stack_size);
122 yaz_log(YLOG_WARN,
"stack_size has no effect (Requires Boost 1.50)");
124 for (i = 0; i < min_threads; i++)
131 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
132 m_p->m_stop_flag =
true;
133 m_p->m_cond_input_data.notify_all();
135 m_p->m_thrds.join_all();
136 m_p->m_socketObservable->deleteObserver(
this);
142 #if BOOST_VERSION >= 105000
143 boost::thread *x =
new boost::thread(
m_p->attrs, w);
145 boost::thread *x =
new boost::thread(w);
148 m_p->m_thrds.add_thread(x);
153 if (event & SOCKET_OBSERVE_READ)
157 recv(
m_p->m_pipe.read_fd(), buf, 1, 0);
159 ssize_t r = read(
m_p->m_pipe.read_fd(), buf, 1);
162 if (r == (ssize_t) (-1))
163 yaz_log(YLOG_WARN|YLOG_ERRNO,
164 "ThreadPoolSocketObserver::socketNotify. read fail");
167 "ThreadPoolSocketObserver::socketNotify. read returned 0");
173 size_t output_size = 0;
175 boost::mutex::scoped_lock output_lock(
m_p->m_mutex_output_data);
176 if (
m_p->m_output.empty()) {
179 out =
m_p->m_output.front();
180 m_p->m_output.pop_front();
181 output_size =
m_p->m_output.size();
185 size_t input_size = 0;
186 std::ostringstream os;
188 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
189 input_size =
m_p->m_input.size();
192 <<
m_p->m_no_threads -
m_p->m_waiting_threads
193 <<
"/" <<
m_p->m_no_threads
194 <<
" queue in/out " << input_size <<
"/" << output_size;
195 out->
result(os.str().c_str());
203 tbusy =
m_p->m_no_threads -
m_p->m_waiting_threads;
204 total =
m_p->m_no_threads;
213 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
214 m_p->m_waiting_threads++;
215 while (!
m_p->m_stop_flag &&
m_p->m_input.size() == 0)
216 m_p->m_cond_input_data.wait(input_lock);
217 m_p->m_waiting_threads--;
218 if (
m_p->m_stop_flag)
221 in =
m_p->m_input.front();
222 m_p->m_input.pop_front();
223 m_p->m_cond_input_full.notify_all();
227 boost::mutex::scoped_lock output_lock(
m_p->m_mutex_output_data);
228 m_p->m_output.push_back(out);
230 send(
m_p->m_pipe.write_fd(),
"", 1, 0);
232 ssize_t r = write(
m_p->m_pipe.write_fd(),
"", 1);
235 if (r == (ssize_t) (-1))
236 yaz_log(YLOG_WARN|YLOG_ERRNO,
237 "ThreadPoolSocketObserver::run. write fail");
240 "ThreadPoolSocketObserver::run. write returned 0");
249 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
251 std::deque<IThreadPoolMsg *>::iterator it =
m_p->m_input.begin();
252 while (it !=
m_p->m_input.end())
254 if ((*it)->cleanup(info))
257 it =
m_p->m_input.erase(it);
266 boost::mutex::scoped_lock input_lock(
m_p->m_mutex_input_data);
267 if (
m_p->m_waiting_threads == 0 &&
268 m_p->m_no_threads <
m_p->m_max_threads)
273 m_p->m_cond_input_full.wait(input_lock);
274 m_p->m_input.push_back(m);
275 m_p->m_cond_input_data.notify_one();