20 #include <metaproxy/package.hpp>
21 #include <metaproxy/filter.hpp>
23 #include <metaproxy/util.hpp>
26 #include <boost/thread/mutex.hpp>
28 #include <yaz/diagbib1.h>
42 namespace yf = mp::filter;
53 void process(metaproxy_1::Package & package);
57 void add_dead(
unsigned long session_id);
61 void add_session(metaproxy_1::Package &pkg, std::string target);
66 unsigned int cost(std::string target);
67 unsigned int dead(std::string target);
91 yf::LoadBalance::LoadBalance() : m_p(new
Impl)
95 yf::LoadBalance::~LoadBalance()
99 void yf::LoadBalance::configure(
const xmlNode *xmlnode,
bool test_only,
102 m_p->configure(xmlnode);
105 void yf::LoadBalance::process(mp::Package &package)
const
107 m_p->process(package);
111 yf::LoadBalance::Impl::Impl()
115 yf::LoadBalance::Impl::~Impl()
119 void yf::LoadBalance::Impl::configure(
const xmlNode *xmlnode)
123 void yf::LoadBalance::Impl::process(mp::Package &package)
125 bool is_closed_front =
false;
128 if (package.session().is_closed())
130 is_closed_front =
true;
133 Z_GDU *gdu_req = package.request().get();
136 if (gdu_req && gdu_req->which == Z_GDU_Z3950)
139 if (gdu_req->u.z3950->which == Z_APDU_initRequest)
141 yazpp_1::GDU base_req(gdu_req);
142 Z_APDU *apdu = base_req.get()->u.z3950;
144 Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
145 mp::odr odr_en(ODR_ENCODE);
147 std::list<std::string> vhosts;
148 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
153 std::list<std::string>::iterator ivh = vhosts.begin();
154 std::list<std::string>::iterator ivh_pick = vhosts.end();
156 Package init_pkg(package.session(), package.origin());
157 init_pkg.copy_filter(package);
159 unsigned int cost_i = std::numeric_limits<unsigned int>::max();
161 boost::mutex::scoped_lock scoped_lock(m_mutex);
163 for (; ivh != vhosts.end(); ivh++)
165 if ((*ivh).size() != 0)
168 = yf::LoadBalance::Impl::cost(*ivh);
170 std::ostringstream os;
174 << "Consider " << *ivh
176 yaz_log(YLOG_LOG,
"%s", os.str().c_str());
185 if (ivh_pick == vhosts.end())
187 std::string target = *ivh_pick;
188 vhosts.erase(ivh_pick);
190 yazpp_1::GDU init_gdu(base_req);
191 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
193 mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
196 init_pkg.request() = init_gdu;
202 if (!init_pkg.session().is_closed())
205 boost::mutex::scoped_lock scoped_lock(m_mutex);
206 add_session(package, target);
209 package.response() = init_pkg.response();
212 std::ostringstream os;
216 << "Failed " << target;
217 yaz_log(YLOG_LOG,
"%s", os.str().c_str());
220 package.response() = odr.create_initResponse(
221 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
222 "load_balance: no available targets");
223 package.session().close();
227 else if (gdu_req->u.z3950->which == Z_APDU_close)
229 is_closed_front =
true;
230 boost::mutex::scoped_lock scoped_lock(m_mutex);
231 add_package(package.session().id());
236 boost::mutex::scoped_lock scoped_lock(m_mutex);
237 add_package(package.session().id());
244 bool is_closed_back =
false;
247 if (package.session().is_closed())
248 is_closed_back =
true;
250 Z_GDU *gdu_res = package.response().get();
253 if (gdu_res && gdu_res->which == Z_GDU_Z3950)
256 if (gdu_res->u.z3950->which == Z_APDU_close)
258 is_closed_back =
true;
259 boost::mutex::scoped_lock scoped_lock(m_mutex);
260 remove_package(package.session().id());
265 boost::mutex::scoped_lock scoped_lock(m_mutex);
266 remove_package(package.session().id());
271 if (is_closed_back || is_closed_front)
273 boost::mutex::scoped_lock scoped_lock(m_mutex);
276 if (is_closed_front ==
false)
277 add_dead(package.session().id());
279 remove_session(package);
282 package.session().close();
287 void yf::LoadBalance::Impl::add_dead(
unsigned long session_id)
289 std::string target = find_session_target(session_id);
291 if (target.size() != 0)
293 std::map<std::string, TargetStat>::iterator itarg;
294 itarg = m_target_stat.find(target);
295 if (itarg != m_target_stat.end()
296 && itarg->second.deads < std::numeric_limits<unsigned int>::max())
298 itarg->second.deads += 1;
305 void yf::LoadBalance::Impl::add_package(
unsigned long session_id)
307 std::string target = find_session_target(session_id);
309 if (target.size() != 0)
311 std::map<std::string, TargetStat>::iterator itarg;
312 itarg = m_target_stat.find(target);
313 if (itarg != m_target_stat.end()
314 && itarg->second.packages
315 < std::numeric_limits<unsigned int>::max())
317 itarg->second.packages += 1;
322 void yf::LoadBalance::Impl::remove_package(
unsigned long session_id)
324 std::string target = find_session_target(session_id);
326 if (target.size() != 0)
328 std::map<std::string, TargetStat>::iterator itarg;
329 itarg = m_target_stat.find(target);
330 if (itarg != m_target_stat.end()
331 && itarg->second.packages > 0)
333 itarg->second.packages -= 1;
338 void yf::LoadBalance::Impl::add_session(mp::Package &package,
342 unsigned long session_id = package.session().id();
343 std::map<unsigned long, std::string>::iterator isess;
344 isess = m_session_target.find(session_id);
345 if (isess == m_session_target.end())
347 m_session_target.insert(std::make_pair(session_id, target));
349 std::ostringstream os;
352 << "0.000000 Add " << target << " size="
353 << m_session_target.size();
354 yaz_log(YLOG_LOG,
"%s", os.str().c_str());
358 std::map<std::string, TargetStat>::iterator itarg;
359 itarg = m_target_stat.find(target);
360 if (itarg == m_target_stat.end())
366 m_target_stat.insert(std::make_pair(target, stat));
368 else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
370 itarg->second.sessions += 1;
374 void yf::LoadBalance::Impl::remove_session(mp::Package &package)
376 unsigned long session_id = package.session().id();
377 std::map<unsigned long, std::string>::iterator isess;
378 isess = m_session_target.find(session_id);
379 if (isess == m_session_target.end())
382 std::string target = isess->second;
383 m_session_target.erase(isess);
385 std::ostringstream os;
388 << "0.000000 Remove " << target << " size="
389 << m_session_target.size();
390 yaz_log(YLOG_LOG,
"%s", os.str().c_str());
393 std::map<std::string, TargetStat>::iterator itarg;
394 itarg = m_target_stat.find(target);
395 if (itarg == m_target_stat.end())
398 if (itarg->second.sessions > 0)
399 itarg->second.sessions -= 1;
401 if (itarg->second.sessions == 0 || itarg->second.deads == 0)
402 m_target_stat.erase(itarg);
405 std::string yf::LoadBalance::Impl::find_session_target(
unsigned long session_id)
408 std::map<unsigned long, std::string>::iterator isess;
409 isess = m_session_target.find(session_id);
410 if (isess != m_session_target.end())
411 target = isess->second;
417 unsigned int yf::LoadBalance::Impl::cost(std::string target)
419 unsigned int cost = 0;
421 if (target.size() != 0)
423 std::map<std::string, TargetStat>::iterator itarg;
424 itarg = m_target_stat.find(target);
425 if (itarg != m_target_stat.end())
426 cost = itarg->second.cost();
431 unsigned int yf::LoadBalance::Impl::dead(std::string target)
433 unsigned int dead = 0;
435 if (target.size() != 0)
437 std::map<std::string, TargetStat>::iterator itarg;
438 itarg = m_target_stat.find(target);
439 if (itarg != m_target_stat.end())
440 dead = itarg->second.deads;
448 return new mp::filter::LoadBalance;
static mp::filter::Base * filter_creator()
struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance