metaproxy 1.22.1
Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
metaproxy_1::ThreadPoolSocketObserver Class Reference

#include <thread_pool_observer.hpp>

Inheritance diagram for metaproxy_1::ThreadPoolSocketObserver:
Inheritance graph
Collaboration diagram for metaproxy_1::ThreadPoolSocketObserver:
Collaboration graph

Classes

class  Rep
 
class  Worker
 

Public Member Functions

 ThreadPoolSocketObserver (yazpp_1::ISocketObservable *obs, unsigned min_threads, unsigned max_threads, unsigned stack_size)
 
virtual ~ThreadPoolSocketObserver ()
 
void put (IThreadPoolMsg *m)
 
void cleanup (IThreadPoolMsg *m, void *info)
 
IThreadPoolMsgget ()
 
void run (void *p)
 
void get_thread_info (int &tbusy, int &total)
 

Private Member Functions

void add_worker (void)
 
void socketNotify (int event)
 

Private Attributes

boost::scoped_ptr< Repm_p
 

Detailed Description

Definition at line 36 of file thread_pool_observer.hpp.

Constructor & Destructor Documentation

◆ ThreadPoolSocketObserver()

ThreadPoolSocketObserver::ThreadPoolSocketObserver ( yazpp_1::ISocketObservable *  obs,
unsigned  min_threads,
unsigned  max_threads,
unsigned  stack_size 
)

Definition at line 102 of file thread_pool_observer.cpp.

106 : m_p(new Rep(obs))
107{
108 obs->addObserver(m_p->m_pipe.read_fd(), this);
109 obs->maskObserver(this, SOCKET_OBSERVE_READ);
110
111 m_p->m_stop_flag = false;
112 m_p->m_no_threads = 0;
113 m_p->m_min_threads = min_threads;
114 m_p->m_max_threads = max_threads;
115 m_p->m_waiting_threads = 0;
116 unsigned i;
117#if BOOST_VERSION >= 105000
118 if (stack_size > 0)
119 m_p->attrs.set_stack_size(stack_size);
120#else
121 if (stack_size)
122 yaz_log(YLOG_WARN, "stack_size has no effect (Requires Boost 1.50)");
123#endif
124 for (i = 0; i < min_threads; i++)
125 add_worker();
126}

References add_worker(), and m_p.

Here is the call graph for this function:

◆ ~ThreadPoolSocketObserver()

ThreadPoolSocketObserver::~ThreadPoolSocketObserver ( )
virtual

Definition at line 128 of file thread_pool_observer.cpp.

129{
130 {
131 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
132 m_p->m_stop_flag = true;
133 m_p->m_cond_input_data.notify_all();
134 }
135 m_p->m_thrds.join_all();
136 m_p->m_socketObservable->deleteObserver(this);
137}

References m_p.

Member Function Documentation

◆ add_worker()

void ThreadPoolSocketObserver::add_worker ( void  )
private

Definition at line 139 of file thread_pool_observer.cpp.

140{
141 Worker w(this);
142#if BOOST_VERSION >= 105000
143 boost::thread *x = new boost::thread(m_p->attrs, w);
144#else
145 boost::thread *x = new boost::thread(w);
146#endif
147 m_p->m_no_threads++;
148 m_p->m_thrds.add_thread(x);
149}

References m_p.

Referenced by put(), and ThreadPoolSocketObserver().

◆ cleanup()

void ThreadPoolSocketObserver::cleanup ( IThreadPoolMsg m,
void *  info 
)

Definition at line 247 of file thread_pool_observer.cpp.

248{
249 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
250
251 std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
252 while (it != m_p->m_input.end())
253 {
254 if ((*it)->cleanup(info))
255 {
256 delete *it;
257 it = m_p->m_input.erase(it);
258 }
259 else
260 it++;
261 }
262}

References m_p.

◆ get()

IThreadPoolMsg * metaproxy_1::ThreadPoolSocketObserver::get ( )

◆ get_thread_info()

void ThreadPoolSocketObserver::get_thread_info ( int &  tbusy,
int &  total 
)

Definition at line 201 of file thread_pool_observer.cpp.

202{
203 tbusy = m_p->m_no_threads - m_p->m_waiting_threads;
204 total = m_p->m_no_threads;
205}

References m_p.

◆ put()

void ThreadPoolSocketObserver::put ( IThreadPoolMsg m)

Definition at line 264 of file thread_pool_observer.cpp.

265{
266 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
267 if (m_p->m_waiting_threads == 0 &&
268 m_p->m_no_threads < m_p->m_max_threads)
269 {
270 add_worker();
271 }
272 while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
273 m_p->m_cond_input_full.wait(input_lock);
274 m_p->m_input.push_back(m);
275 m_p->m_cond_input_data.notify_one();
276}
const unsigned int queue_size_per_thread

References add_worker(), m_p, and metaproxy_1::queue_size_per_thread.

Here is the call graph for this function:

◆ run()

void ThreadPoolSocketObserver::run ( void *  p)

Definition at line 207 of file thread_pool_observer.cpp.

208{
209 while(1)
210 {
211 IThreadPoolMsg *in = 0;
212 {
213 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
214 m_p->m_waiting_threads++;
215 while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
216 m_p->m_cond_input_data.wait(input_lock);
217 m_p->m_waiting_threads--;
218 if (m_p->m_stop_flag)
219 break;
220
221 in = m_p->m_input.front();
222 m_p->m_input.pop_front();
223 m_p->m_cond_input_full.notify_all();
224 }
225 IThreadPoolMsg *out = in->handle();
226 {
227 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
228 m_p->m_output.push_back(out);
229#ifdef WIN32
230 send(m_p->m_pipe.write_fd(), "", 1, 0);
231#else
232 ssize_t r = write(m_p->m_pipe.write_fd(), "", 1);
233 if (r != 1)
234 {
235 if (r == (ssize_t) (-1))
236 yaz_log(YLOG_WARN|YLOG_ERRNO,
237 "ThreadPoolSocketObserver::run. write fail");
238 else
239 yaz_log(YLOG_WARN,
240 "ThreadPoolSocketObserver::run. write returned 0");
241 }
242#endif
243 }
244 }
245}
virtual IThreadPoolMsg * handle()=0

References metaproxy_1::IThreadPoolMsg::handle(), and m_p.

Referenced by metaproxy_1::ThreadPoolSocketObserver::Worker::operator()().

Here is the call graph for this function:

◆ socketNotify()

void ThreadPoolSocketObserver::socketNotify ( int  event)
private

Definition at line 151 of file thread_pool_observer.cpp.

152{
153 if (event & SOCKET_OBSERVE_READ)
154 {
155 char buf[2];
156#ifdef WIN32
157 recv(m_p->m_pipe.read_fd(), buf, 1, 0);
158#else
159 ssize_t r = read(m_p->m_pipe.read_fd(), buf, 1);
160 if (r != 1)
161 {
162 if (r == (ssize_t) (-1))
163 yaz_log(YLOG_WARN|YLOG_ERRNO,
164 "ThreadPoolSocketObserver::socketNotify. read fail");
165 else
166 yaz_log(YLOG_WARN,
167 "ThreadPoolSocketObserver::socketNotify. read returned 0");
168 }
169#endif
170 while (1)
171 {
172 IThreadPoolMsg *out;
173 size_t output_size = 0;
174 {
175 boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
176 if (m_p->m_output.empty()) {
177 break;
178 }
179 out = m_p->m_output.front();
180 m_p->m_output.pop_front();
181 output_size = m_p->m_output.size();
182 }
183 if (out)
184 {
185 size_t input_size = 0;
186 std::ostringstream os;
187 {
188 boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
189 input_size = m_p->m_input.size();
190 }
191 os << "tbusy/total "
192 << m_p->m_no_threads - m_p->m_waiting_threads
193 << "/" << m_p->m_no_threads
194 << " queue in/out " << input_size << "/" << output_size;
195 out->result(os.str().c_str());
196 }
197 }
198 }
199}
virtual void result(const char *info)=0

References m_p, and metaproxy_1::IThreadPoolMsg::result().

Here is the call graph for this function:

Member Data Documentation

◆ m_p

boost::scoped_ptr<Rep> metaproxy_1::ThreadPoolSocketObserver::m_p
private

The documentation for this class was generated from the following files: