metaproxy 1.22.1
filter_load_balance.cpp
Go to the documentation of this file.
1/* This file is part of Metaproxy.
2 Copyright (C) Index Data
3
4Metaproxy is free software; you can redistribute it and/or modify it under
5the terms of the GNU General Public License as published by the Free
6Software Foundation; either version 2, or (at your option) any later
7version.
8
9Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10WARRANTY; without even the implied warranty of MERCHANTABILITY or
11FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12for more details.
13
14You should have received a copy of the GNU General Public License
15along with this program; if not, write to the Free Software
16Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17*/
18
19#include "config.hpp"
20#include <metaproxy/package.hpp>
21#include <metaproxy/filter.hpp>
23#include <metaproxy/util.hpp>
24
25
26#include <boost/thread/mutex.hpp>
27
28#include <yaz/diagbib1.h>
29#include <yaz/log.h>
30#include <yaz/zgdu.h>
31
32// remove max macro if already defined (defined later in <limits>)
33#ifdef max
34#undef max
35#endif
36
37#include <list>
38#include <map>
39#include <limits>
40
41namespace mp = metaproxy_1;
42namespace yf = mp::filter;
43
44namespace metaproxy_1
45{
46 namespace filter
47 {
49 {
50 public:
51 Impl();
52 ~Impl();
53 void process(metaproxy_1::Package & package);
54 void configure(const xmlNode * ptr);
55 private:
56 // statistic manipulating functions,
57 void add_dead(unsigned long session_id);
58 //void clear_dead(unsigned long session_id);
59 void add_package(unsigned long session_id);
60 void remove_package(unsigned long session_id);
61 void add_session(metaproxy_1::Package &pkg, std::string target);
62 void remove_session(metaproxy_1::Package &pkg);
63 std::string find_session_target(unsigned long session_id);
64
65 // cost functions
66 unsigned int cost(std::string target);
67 unsigned int dead(std::string target);
68
69 // local classes
70 class TargetStat {
71 public:
72 unsigned int sessions;
73 unsigned int packages;
74 unsigned int deads;
75 unsigned int cost() {
76 unsigned int c = sessions + packages + deads;
77 return c;
78 }
79 };
80
81 // local protected databases
82 boost::mutex m_mutex;
83 std::map<std::string, TargetStat> m_target_stat;
84 std::map<unsigned long, std::string> m_session_target;
85 };
86 }
87}
88
89// define Pimpl wrapper forwarding to Impl
90
91yf::LoadBalance::LoadBalance() : m_p(new Impl)
92{
93}
94
95yf::LoadBalance::~LoadBalance()
96{ // must have a destructor because of boost::scoped_ptr
97}
98
99void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only,
100 const char *path)
101{
102 m_p->configure(xmlnode);
103}
104
105void yf::LoadBalance::process(mp::Package &package) const
106{
107 m_p->process(package);
108}
109
110
111yf::LoadBalance::Impl::Impl()
112{
113}
114
115yf::LoadBalance::Impl::~Impl()
116{
117}
118
119void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
120{
121}
122
123void yf::LoadBalance::Impl::process(mp::Package &package)
124{
125 bool is_closed_front = false;
126
127 // checking for closed front end packages
128 if (package.session().is_closed())
129 {
130 is_closed_front = true;
131 }
132
133 Z_GDU *gdu_req = package.request().get();
134
135 // passing anything but z3950 packages
136 if (gdu_req && gdu_req->which == Z_GDU_Z3950)
137 {
138 // target selecting only on Z39.50 init request
139 if (gdu_req->u.z3950->which == Z_APDU_initRequest)
140 {
141 yazpp_1::GDU base_req(gdu_req);
142 Z_APDU *apdu = base_req.get()->u.z3950;
143
144 Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
145 mp::odr odr_en(ODR_ENCODE);
146
147 std::list<std::string> vhosts;
148 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
149 // get lowest of all vhosts.. Remove them if individually if
150 // they turn out to be bad..
151 while (1)
152 {
153 std::list<std::string>::iterator ivh = vhosts.begin();
154 std::list<std::string>::iterator ivh_pick = vhosts.end();
155
156 Package init_pkg(package.session(), package.origin());
157 init_pkg.copy_filter(package);
158
159 unsigned int cost_i = std::numeric_limits<unsigned int>::max();
160 {
161 boost::mutex::scoped_lock scoped_lock(m_mutex);
162
163 for (; ivh != vhosts.end(); ivh++)
164 {
165 if ((*ivh).size() != 0)
166 {
167 unsigned int cost
168 = yf::LoadBalance::Impl::cost(*ivh);
169
170 std::ostringstream os;
171 os << "LB" << " "
172 << package << " "
173 << "0.000000" << " "
174 << "Consider " << *ivh
175 << " cost=" << cost;
176 yaz_log(YLOG_LOG, "%s", os.str().c_str());
177 if (cost_i > cost)
178 {
179 ivh_pick = ivh;
180 cost_i = cost;
181 }
182 }
183 }
184 }
185 if (ivh_pick == vhosts.end())
186 break;
187 std::string target = *ivh_pick;
188 vhosts.erase(ivh_pick);
189 // copying new target into init package
190 yazpp_1::GDU init_gdu(base_req);
191 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
192
193 mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
194 odr_en, target, 1);
195
196 init_pkg.request() = init_gdu;
197
198 // moving all package types
199 init_pkg.move();
200
201 // checking for closed back end packages
202 if (!init_pkg.session().is_closed())
203 {
204 {
205 boost::mutex::scoped_lock scoped_lock(m_mutex);
206 add_session(package, target);
207 }
208
209 package.response() = init_pkg.response();
210 return;
211 }
212 std::ostringstream os;
213 os << "LB" << " "
214 << package << " "
215 << "0.000000" << " "
216 << "Failed " << target;
217 yaz_log(YLOG_LOG, "%s", os.str().c_str());
218 }
219 mp::odr odr;
220 package.response() = odr.create_initResponse(
221 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
222 "load_balance: no available targets");
223 package.session().close();
224 return;
225 }
226 // frontend Z39.50 close request is added to statistics and marked
227 else if (gdu_req->u.z3950->which == Z_APDU_close)
228 {
229 is_closed_front = true;
230 boost::mutex::scoped_lock scoped_lock(m_mutex);
231 add_package(package.session().id());
232 }
233 // any other Z39.50 package is added to statistics
234 else
235 {
236 boost::mutex::scoped_lock scoped_lock(m_mutex);
237 add_package(package.session().id());
238 }
239 }
240
241 // moving all package types
242 package.move();
243
244 bool is_closed_back = false;
245
246 // checking for closed back end packages
247 if (package.session().is_closed())
248 is_closed_back = true;
249
250 Z_GDU *gdu_res = package.response().get();
251
252 // passing anything but z3950 packages
253 if (gdu_res && gdu_res->which == Z_GDU_Z3950)
254 {
255 // session closing only on Z39.50 close response
256 if (gdu_res->u.z3950->which == Z_APDU_close)
257 {
258 is_closed_back = true;
259 boost::mutex::scoped_lock scoped_lock(m_mutex);
260 remove_package(package.session().id());
261 }
262 // any other Z39.50 package is removed from statistics
263 else
264 {
265 boost::mutex::scoped_lock scoped_lock(m_mutex);
266 remove_package(package.session().id());
267 }
268 }
269
270 // finally removing sessions and marking deads
271 if (is_closed_back || is_closed_front)
272 {
273 boost::mutex::scoped_lock scoped_lock(m_mutex);
274
275 // marking backend dead if backend closed without fronted close
276 if (is_closed_front == false)
277 add_dead(package.session().id());
278
279 remove_session(package);
280
281 // making sure that package is closed
282 package.session().close();
283 }
284}
285
286// statistic manipulating functions,
287void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
288{
289 std::string target = find_session_target(session_id);
290
291 if (target.size() != 0)
292 {
293 std::map<std::string, TargetStat>::iterator itarg;
294 itarg = m_target_stat.find(target);
295 if (itarg != m_target_stat.end()
296 && itarg->second.deads < std::numeric_limits<unsigned int>::max())
297 {
298 itarg->second.deads += 1;
299 // std:.cout << "add_dead " << session_id << " " << target
300 // << " d:" << itarg->second.deads << "\n";
301 }
302 }
303}
304
305void yf::LoadBalance::Impl::add_package(unsigned long session_id)
306{
307 std::string target = find_session_target(session_id);
308
309 if (target.size() != 0)
310 {
311 std::map<std::string, TargetStat>::iterator itarg;
312 itarg = m_target_stat.find(target);
313 if (itarg != m_target_stat.end()
314 && itarg->second.packages
315 < std::numeric_limits<unsigned int>::max())
316 {
317 itarg->second.packages += 1;
318 }
319 }
320}
321
322void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
323{
324 std::string target = find_session_target(session_id);
325
326 if (target.size() != 0)
327 {
328 std::map<std::string, TargetStat>::iterator itarg;
329 itarg = m_target_stat.find(target);
330 if (itarg != m_target_stat.end()
331 && itarg->second.packages > 0)
332 {
333 itarg->second.packages -= 1;
334 }
335 }
336}
337
338void yf::LoadBalance::Impl::add_session(mp::Package &package,
339 std::string target)
340{
341 // finding and adding session
342 unsigned long session_id = package.session().id();
343 std::map<unsigned long, std::string>::iterator isess;
344 isess = m_session_target.find(session_id);
345 if (isess == m_session_target.end())
346 {
347 m_session_target.insert(std::make_pair(session_id, target));
348
349 std::ostringstream os;
350 os << "LB" << " "
351 << package << " "
352 << "0.000000 Add " << target << " size="
353 << m_session_target.size();
354 yaz_log(YLOG_LOG, "%s", os.str().c_str());
355 }
356
357 // finding and adding target statistics
358 std::map<std::string, TargetStat>::iterator itarg;
359 itarg = m_target_stat.find(target);
360 if (itarg == m_target_stat.end())
361 {
362 TargetStat stat;
363 stat.sessions = 1;
364 stat.packages = 0;
365 stat.deads = 0;
366 m_target_stat.insert(std::make_pair(target, stat));
367 }
368 else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
369 {
370 itarg->second.sessions += 1;
371 }
372}
373
374void yf::LoadBalance::Impl::remove_session(mp::Package &package)
375{
376 unsigned long session_id = package.session().id();
377 std::map<unsigned long, std::string>::iterator isess;
378 isess = m_session_target.find(session_id);
379 if (isess == m_session_target.end())
380 return;
381
382 std::string target = isess->second;
383 m_session_target.erase(isess);
384
385 std::ostringstream os;
386 os << "LB" << " "
387 << package << " "
388 << "0.000000 Remove " << target << " size="
389 << m_session_target.size();
390 yaz_log(YLOG_LOG, "%s", os.str().c_str());
391
392 // finding target statistics
393 std::map<std::string, TargetStat>::iterator itarg;
394 itarg = m_target_stat.find(target);
395 if (itarg == m_target_stat.end())
396 return;
397
398 if (itarg->second.sessions > 0)
399 itarg->second.sessions -= 1;
400
401 if (itarg->second.sessions == 0 || itarg->second.deads == 0)
402 m_target_stat.erase(itarg);
403}
404
405std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
406{
407 std::string target;
408 std::map<unsigned long, std::string>::iterator isess;
409 isess = m_session_target.find(session_id);
410 if (isess != m_session_target.end())
411 target = isess->second;
412 return target;
413}
414
415
416// cost functions
417unsigned int yf::LoadBalance::Impl::cost(std::string target)
418{
419 unsigned int cost = 0;
420
421 if (target.size() != 0)
422 {
423 std::map<std::string, TargetStat>::iterator itarg;
424 itarg = m_target_stat.find(target);
425 if (itarg != m_target_stat.end())
426 cost = itarg->second.cost();
427 }
428 return cost;
429}
430
431unsigned int yf::LoadBalance::Impl::dead(std::string target)
432{
433 unsigned int dead = 0;
434
435 if (target.size() != 0)
436 {
437 std::map<std::string, TargetStat>::iterator itarg;
438 itarg = m_target_stat.find(target);
439 if (itarg != m_target_stat.end())
440 dead = itarg->second.deads;
441 }
442 return dead;
443}
444
445
446static mp::filter::Base* filter_creator()
447{
448 return new mp::filter::LoadBalance;
449}
450
451extern "C" {
452 struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
453 0,
454 "load_balance",
456 };
457}
458
459
460/*
461 * Local variables:
462 * c-basic-offset: 4
463 * c-file-style: "Stroustrup"
464 * indent-tabs-mode: nil
465 * End:
466 * vim: shiftwidth=4 tabstop=8 expandtab
467 */
468
std::string find_session_target(unsigned long session_id)
void add_session(metaproxy_1::Package &pkg, std::string target)
void add_package(unsigned long session_id)
unsigned int dead(std::string target)
void remove_package(unsigned long session_id)
unsigned int cost(std::string target)
std::map< unsigned long, std::string > m_session_target
void remove_session(metaproxy_1::Package &pkg)
std::map< std::string, TargetStat > m_target_stat
void process(metaproxy_1::Package &package)
void add_dead(unsigned long session_id)
static mp::filter::Base * filter_creator()
struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance