metaproxy  1.21.0
filter_multi.cpp
Go to the documentation of this file.
1 /* This file is part of Metaproxy.
2  Copyright (C) Index Data
3 
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8 
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 for more details.
13 
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <yaz/log.h>
20 
21 #include "config.hpp"
22 
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25 
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30 
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33 
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38 #include <yaz/oid_db.h>
39 
40 #include <vector>
41 #include <algorithm>
42 #include <map>
43 #include <iostream>
44 
45 namespace mp = metaproxy_1;
46 namespace yf = mp::filter;
47 
48 namespace metaproxy_1 {
49  namespace filter {
53  };
56  int m_count;
57  bool operator < (const BackendSet &k) const;
58  bool operator == (const BackendSet &k) const;
59  };
61  std::string m_norm_term;
62  std::string m_display_term;
63  int m_count;
64  bool operator < (const ScanTermInfo &) const;
65  bool operator == (const ScanTermInfo &) const;
66  Z_Entry *get_entry(ODR odr);
67  };
69  class PresentJob {
70  public:
72  int m_pos; // position for backend (1=first, 2=second,..
73  int m_start; // present request start
74  PresentJob(BackendPtr ptr, int pos) :
75  m_backend(ptr), m_pos(pos), m_start(0) {};
76  };
77  FrontendSet(std::string setname);
78  FrontendSet();
79  ~FrontendSet();
80 
81  void round_robin(int pos, int number, std::list<PresentJob> &job);
82  void serve_order(int pos, int number, std::list<PresentJob> &job);
83 
84  std::list<BackendSet> m_backend_sets;
85  std::string m_setname;
86  };
87  struct Multi::Backend {
89  std::string m_backend_database;
90  std::string m_vhost;
91  std::string m_route;
92  std::string m_auth;
93  void operator() (void); // thread operation
94  };
95  struct Multi::Frontend {
96  Frontend(Rep *rep);
97  ~Frontend();
98  bool m_is_multi;
99  bool m_in_use;
100  std::list<BackendPtr> m_backend_list;
101  std::map<std::string,Multi::FrontendSet> m_sets;
102 
103  void multi_move(std::list<BackendPtr> &blist);
104  void init(Package &package, Z_GDU *gdu);
105  void close(Package &package);
106  void search(Package &package, Z_APDU *apdu);
107  void present(Package &package, Z_APDU *apdu);
108  void scan(Package &package, Z_APDU *apdu);
109  void relay_apdu(Package &package, Z_APDU *apdu);
110  void record_diagnostics(Z_Records *records,
111  Z_DiagRecs * &z_diag,
112  ODR odr,
113  int &no_successful);
115  };
116  class Multi::Map {
117  std::string m_target_pattern;
118  std::string m_route;
119  std::string m_auth;
120  public:
121  Map(std::string pattern, std::string route, std::string auth) :
122  m_target_pattern(pattern), m_route(route), m_auth(auth) {};
123  bool match(const std::string target, std::string *ret,
124  std::string *auth) const {
125  if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
126  {
127  *ret = m_route;
128  *auth = m_auth;
129  return true;
130  }
131  return false;
132  };
133  };
134  class Multi::Rep {
135  friend class Multi;
136  friend struct Frontend;
137 
138  Rep();
139  FrontendPtr get_frontend(Package &package);
140  void release_frontend(Package &package);
141  private:
142  std::list<Multi::Map> m_route_patterns;
143  boost::mutex m_mutex;
144  boost::condition m_cond_session_ready;
145  std::map<mp::Session, FrontendPtr> m_clients;
149  };
150  }
151 }
152 
153 yf::Multi::Rep::Rep()
154 {
155  m_hide_unavailable = false;
156  m_hide_errors = false;
157  m_merge_type = round_robin;
158 }
159 
160 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
161 {
162  return m_count < k.m_count;
163 }
164 
165 yf::Multi::Frontend::Frontend(Rep *rep)
166 {
167  m_p = rep;
168  m_is_multi = false;
169 }
170 
171 yf::Multi::Frontend::~Frontend()
172 {
173 }
174 
175 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
176 {
177  boost::mutex::scoped_lock lock(m_mutex);
178 
179  std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
180 
181  while(true)
182  {
183  it = m_clients.find(package.session());
184  if (it == m_clients.end())
185  break;
186 
187  if (!it->second->m_in_use)
188  {
189  it->second->m_in_use = true;
190  return it->second;
191  }
192  m_cond_session_ready.wait(lock);
193  }
194  FrontendPtr f(new Frontend(this));
195  m_clients[package.session()] = f;
196  f->m_in_use = true;
197  return f;
198 }
199 
200 void yf::Multi::Rep::release_frontend(mp::Package &package)
201 {
202  boost::mutex::scoped_lock lock(m_mutex);
203  std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
204 
205  it = m_clients.find(package.session());
206  if (it != m_clients.end())
207  {
208  if (package.session().is_closed())
209  {
210  it->second->close(package);
211  m_clients.erase(it);
212  }
213  else
214  {
215  it->second->m_in_use = false;
216  }
217  m_cond_session_ready.notify_all();
218  }
219 }
220 
221 yf::Multi::FrontendSet::FrontendSet(std::string setname)
222  : m_setname(setname)
223 {
224 }
225 
226 
227 yf::Multi::FrontendSet::FrontendSet()
228 {
229 }
230 
231 
232 yf::Multi::FrontendSet::~FrontendSet()
233 {
234 }
235 
236 yf::Multi::Multi() : m_p(new Multi::Rep)
237 {
238 }
239 
240 yf::Multi::~Multi() {
241 }
242 
243 
244 void yf::Multi::Backend::operator() (void)
245 {
246  m_package->move(m_route);
247 }
248 
249 
250 void yf::Multi::Frontend::close(mp::Package &package)
251 {
252  std::list<BackendPtr>::const_iterator bit;
253  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
254  {
255  BackendPtr b = *bit;
256 
257  b->m_package->copy_filter(package);
258  b->m_package->request() = (Z_GDU *) 0;
259  b->m_package->session().close();
260  b->m_package->move(b->m_route);
261  }
262 }
263 
264 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
265 {
266  std::list<BackendPtr>::const_iterator bit;
267  boost::thread_group g;
268  for (bit = blist.begin(); bit != blist.end(); bit++)
269  {
270  g.add_thread(new boost::thread(**bit));
271  }
272  g.join_all();
273 }
274 
275 void yf::Multi::FrontendSet::serve_order(int start, int number,
276  std::list<PresentJob> &jobs)
277 {
278  int i;
279  for (i = 0; i < number; i++)
280  {
281  std::list<BackendSet>::const_iterator bsit;
282  int voffset = 0;
283  int offset = start + i - 1;
284  for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end();
285  bsit++)
286  {
287  if (offset >= voffset && offset < voffset + bsit->m_count)
288  {
289  PresentJob job(bsit->m_backend, offset - voffset + 1);
290  jobs.push_back(job);
291  break;
292  }
293  voffset += bsit->m_count;
294  }
295  }
296 }
297 
298 void yf::Multi::FrontendSet::round_robin(int start, int number,
299  std::list<PresentJob> &jobs)
300 {
301  std::list<int> pos;
302  std::list<BackendSet>::const_iterator bsit;
303  for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
304  {
305  pos.push_back(1);
306  }
307 
308  int p = 1;
309 #if 1
310  // optimization step!
311  int omin = 0;
312  while(true)
313  {
314  int min = 0;
315  int no_left = 0;
316  // find min count for each set which is > omin
317  for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
318  {
319  if (bsit->m_count > omin)
320  {
321  if (no_left == 0 || bsit->m_count < min)
322  min = bsit->m_count;
323  no_left++;
324  }
325  }
326  if (no_left == 0) // if nothing greater than omin, bail out.
327  break;
328  int skip = no_left * min;
329  if (p + skip > start) // step gets us "into" present range?
330  {
331  // Yes. skip until start.. Rounding off is deliberate!
332  min = (start-p) / no_left;
333  p += no_left * min;
334 
335  // update positions in each set..
336  std::list<int>::iterator psit = pos.begin();
337  for (psit = pos.begin(); psit != pos.end(); psit++)
338  *psit += min;
339  break;
340  }
341  // skip on each set.. before "present range"..
342  p = p + skip;
343 
344  std::list<int>::iterator psit = pos.begin();
345  for (psit = pos.begin(); psit != pos.end(); psit++)
346  *psit += min;
347 
348  omin = min; // update so we consider next class (with higher count)
349  }
350 #endif
351  int fetched = 0;
352  bool more = true;
353  while (more)
354  {
355  more = false;
356  std::list<int>::iterator psit = pos.begin();
357  bsit = m_backend_sets.begin();
358 
359  for (; bsit != m_backend_sets.end(); psit++,bsit++)
360  {
361  if (fetched >= number)
362  {
363  more = false;
364  break;
365  }
366  if (*psit <= bsit->m_count)
367  {
368  if (p >= start)
369  {
370  PresentJob job(bsit->m_backend, *psit);
371  jobs.push_back(job);
372  fetched++;
373  }
374  (*psit)++;
375  p++;
376  more = true;
377  }
378  }
379  }
380 }
381 
382 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
383 {
384  Z_InitRequest *req = gdu->u.z3950->u.initRequest;
385 
386  std::list<std::string> targets;
387 
388  mp::util::get_vhost_otherinfo(req->otherInfo, targets);
389 
390  if (targets.size() < 1)
391  {
392  package.move();
393  return;
394  }
395 
396  std::list<std::string>::const_iterator t_it = targets.begin();
397  for (; t_it != targets.end(); t_it++)
398  {
399  Session s;
400  Backend *b = new Backend;
401  b->m_vhost = *t_it;
402 
403  std::list<Multi::Map>::const_iterator it =
404  m_p->m_route_patterns.begin();
405  while (it != m_p->m_route_patterns.end()) {
406  if (it->match(*t_it, &b->m_route, &b->m_auth))
407  break;
408  it++;
409  }
410  // b->m_route = m_p->m_target_route[*t_it];
411  // b->m_route unset
412  b->m_package = PackagePtr(new Package(s, package.origin()));
413 
414  m_backend_list.push_back(BackendPtr(b));
415  }
416  m_is_multi = true;
417 
418  // create init request
419  std::list<BackendPtr>::iterator bit;
420  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
421  {
422  mp::odr odr;
423  BackendPtr b = *bit;
424  Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
425 
426  std::list<std::string>vhost_one;
427  vhost_one.push_back(b->m_vhost);
428  mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
429  odr, vhost_one);
430 
431 
432  Z_InitRequest *breq = init_apdu->u.initRequest;
433 
434  if (b->m_auth.length())
435  {
436  breq->idAuthentication =
437  (Z_IdAuthentication *)
438  odr_malloc(odr, sizeof(*breq->idAuthentication));
439  breq->idAuthentication->which = Z_IdAuthentication_open;
440  breq->idAuthentication->u.open = odr_strdup(odr, b->m_auth.c_str());
441  }
442  else
443  breq->idAuthentication = req->idAuthentication;
444 
445  *breq->preferredMessageSize = *req->preferredMessageSize;
446  *breq->maximumRecordSize = *req->maximumRecordSize;
447 
448 
449  const char *peer_name = yaz_oi_get_string_oid(
450  &req->otherInfo, yaz_oid_userinfo_client_ip, 1, 0);
451  if (peer_name)
452  yaz_oi_set_string_oid(&breq->otherInfo, odr,
453  yaz_oid_userinfo_client_ip, 1, peer_name);
454 
455  ODR_MASK_SET(breq->options, Z_Options_search);
456  ODR_MASK_SET(breq->options, Z_Options_present);
457  ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
458  ODR_MASK_SET(breq->options, Z_Options_scan);
459 
460  ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
461  ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
462  ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
463 
464  b->m_package->request() = init_apdu;
465 
466  b->m_package->copy_filter(package);
467  }
468  multi_move(m_backend_list);
469 
470  // create the frontend init response based on each backend init response
471  mp::odr odr;
472 
473  Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
474  Z_InitResponse *f_resp = f_apdu->u.initResponse;
475 
476  ODR_MASK_SET(f_resp->options, Z_Options_search);
477  ODR_MASK_SET(f_resp->options, Z_Options_present);
478  ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
479  ODR_MASK_SET(f_resp->options, Z_Options_scan);
480 
481  ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
482  ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
483  ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
484 
485  int no_failed = 0;
486  int no_succeeded = 0;
487 
488  Odr_int preferredMessageSize = *req->preferredMessageSize;
489  Odr_int maximumRecordSize = *req->maximumRecordSize;
490  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
491  {
492  PackagePtr p = (*bit)->m_package;
493 
494  if (p->session().is_closed())
495  {
496  // failed. Remove from list and increment number of failed
497  no_failed++;
498  bit = m_backend_list.erase(bit);
499  continue;
500  }
501  Z_GDU *gdu = p->response().get();
502  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
503  Z_APDU_initResponse)
504  {
505  int i;
506  Z_APDU *b_apdu = gdu->u.z3950;
507  Z_InitResponse *b_resp = b_apdu->u.initResponse;
508 
509  // common options for all backends
510  for (i = 0; i <= Z_Options_stringSchema; i++)
511  {
512  if (!ODR_MASK_GET(b_resp->options, i))
513  ODR_MASK_CLEAR(f_resp->options, i);
514  }
515  // common protocol version
516  for (i = 0; i <= Z_ProtocolVersion_3; i++)
517  if (!ODR_MASK_GET(b_resp->protocolVersion, i))
518  ODR_MASK_CLEAR(f_resp->protocolVersion, i);
519  if (*b_resp->result)
520  {
521  no_succeeded++;
522  if (preferredMessageSize > *b_resp->preferredMessageSize)
523  preferredMessageSize = *b_resp->preferredMessageSize;
524  if (maximumRecordSize > *b_resp->maximumRecordSize)
525  maximumRecordSize = *b_resp->maximumRecordSize;
526  }
527  else
528  {
529  if (!f_resp->userInformationField
530  && b_resp->userInformationField)
531  f_resp->userInformationField = b_resp->userInformationField;
532  no_failed++;
533  }
534  }
535  else
536  no_failed++;
537  bit++;
538  }
539  *f_resp->preferredMessageSize = preferredMessageSize;
540  *f_resp->maximumRecordSize = maximumRecordSize;
541 
542  if (m_p->m_hide_unavailable)
543  {
544  if (no_succeeded == 0)
545  {
546  *f_resp->result = 0;
547  package.session().close();
548  }
549  }
550  else
551  {
552  if (no_failed)
553  {
554  *f_resp->result = 0;
555  package.session().close();
556  }
557  }
558  package.response() = f_apdu;
559 }
560 
561 void yf::Multi::Frontend::record_diagnostics(Z_Records *records,
562  Z_DiagRecs * &z_diag,
563  ODR odr,
564  int &no_successful)
565 {
566  // see we get any errors (AKA diagnstics)
567  if (records)
568  {
569  if (records->which == Z_Records_NSD)
570  {
571  if (!z_diag)
572  {
573  z_diag = (Z_DiagRecs *)
574  odr_malloc(odr, sizeof(*z_diag));
575  z_diag->num_diagRecs = 0;
576  z_diag->diagRecs = (Z_DiagRec**)
577  odr_malloc(odr, sizeof(*z_diag->diagRecs));
578  }
579  else
580  {
581  Z_DiagRec **n = (Z_DiagRec **)
582  odr_malloc(odr,
583  (1 + z_diag->num_diagRecs) * sizeof(*n));
584  memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
585  * sizeof(*n));
586  z_diag->diagRecs = n;
587  }
588  Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
589  nr->which = Z_DiagRec_defaultFormat;
590  nr->u.defaultFormat =
591  records->u.nonSurrogateDiagnostic;
592  z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
593  }
594  else if (records->which == Z_Records_multipleNSD)
595  {
596  Z_DiagRecs * dr =records->u.multipleNonSurDiagnostics;
597 
598  if (!z_diag)
599  {
600  z_diag = (Z_DiagRecs *) odr_malloc(odr, sizeof(*z_diag));
601  z_diag->num_diagRecs = 0;
602  z_diag->diagRecs = 0;
603  }
604  Z_DiagRec **n = (Z_DiagRec **)
605  odr_malloc(odr,
606  (dr->num_diagRecs + z_diag->num_diagRecs) *
607  sizeof(*n));
608  if (z_diag->num_diagRecs)
609  memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs * sizeof(*n));
610  memcpy(n + z_diag->num_diagRecs,
611  dr->diagRecs, dr->num_diagRecs * sizeof(*n));
612  z_diag->diagRecs = n;
613  z_diag->num_diagRecs += dr->num_diagRecs;
614  }
615  else
616  no_successful++; // probably piggyback
617  }
618  else
619  no_successful++; // no records and no diagnostics
620 }
621 
622 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
623 {
624  // create search request
625  Z_SearchRequest *req = apdu_req->u.searchRequest;
626 
627  std::list<BackendPtr>::const_iterator bit;
628  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
629  {
630  PackagePtr p = (*bit)->m_package;
631  yazpp_1::GDU gdu1(apdu_req);
632  mp::odr odr;
633  Z_SearchRequest *req1 = gdu1.get()->u.z3950->u.searchRequest;
634 
635  // they are altered now - to disable piggyback
636  *req1->smallSetUpperBound = 0;
637  *req1->largeSetLowerBound = 1;
638  *req1->mediumSetPresentNumber = 0;
639 
640  if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
641  &req1->num_databaseNames,
642  &req1->databaseNames))
643  {
644  req1->num_databaseNames = req->num_databaseNames;
645  req1->databaseNames = req->databaseNames;
646  }
647  p->request() = gdu1;
648  p->copy_filter(package);
649  }
650  multi_move(m_backend_list);
651 
652  // look at each response
653  FrontendSet resultSet(std::string(req->resultSetName));
654 
655  mp::odr odr;
656  Odr_int result_set_size = 0;
657  Z_DiagRecs *z_diag = 0;
658  int no_successful = 0;
659  PackagePtr close_p;
660 
661  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
662  {
663  PackagePtr p = (*bit)->m_package;
664 
665  // save closing package for at least one target
666  if (p->session().is_closed())
667  close_p = p;
668 
669  Z_GDU *gdu = p->response().get();
670  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
671  Z_APDU_searchResponse)
672  {
673  Z_APDU *b_apdu = gdu->u.z3950;
674  Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
675 
676  record_diagnostics(b_resp->records, z_diag, odr, no_successful);
677 
678  BackendSet backendSet;
679  backendSet.m_backend = *bit;
680  backendSet.m_count = *b_resp->resultCount;
681  result_set_size += *b_resp->resultCount;
682  resultSet.m_backend_sets.push_back(backendSet);
683  }
684  }
685 
686  Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
687  Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
688 
689  yaz_log(YLOG_DEBUG, "no_successful=%d is_closed=%s hide_errors=%s",
690  no_successful,
691  close_p ? "true" : "false",
692  m_p->m_hide_errors ? "true" : "false");
693  *f_resp->resultCount = result_set_size;
694  if (close_p && (no_successful == 0 || !m_p->m_hide_unavailable))
695  {
696  package.session().close();
697  package.response() = close_p->response();
698  return;
699  }
700  if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
701  {
702  f_resp->records = (Z_Records *)
703  odr_malloc(odr, sizeof(*f_resp->records));
704  if (z_diag->num_diagRecs > 1)
705  {
706  f_resp->records->which = Z_Records_multipleNSD;
707  f_resp->records->u.multipleNonSurDiagnostics = z_diag;
708  }
709  else
710  {
711  f_resp->records->which = Z_Records_NSD;
712  f_resp->records->u.nonSurrogateDiagnostic =
713  z_diag->diagRecs[0]->u.defaultFormat;
714  }
715  }
716  // assume OK
717  m_sets[resultSet.m_setname] = resultSet;
718 
719  Odr_int number;
720  mp::util::piggyback(*req->smallSetUpperBound,
721  *req->largeSetLowerBound,
722  *req->mediumSetPresentNumber,
723  0, 0,
725  number, 0);
726  Package pp(package.session(), package.origin());
727  if (z_diag == 0 && number > 0)
728  {
729  pp.copy_filter(package);
730  Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
731  Z_PresentRequest *p_req = p_apdu->u.presentRequest;
732  p_req->preferredRecordSyntax = req->preferredRecordSyntax;
733  p_req->resultSetId = req->resultSetName;
734  *p_req->resultSetStartPoint = 1;
735  *p_req->numberOfRecordsRequested = number;
736  pp.request() = p_apdu;
737  present(pp, p_apdu);
738 
739  if (pp.session().is_closed())
740  package.session().close();
741 
742  Z_GDU *gdu = pp.response().get();
743  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
744  Z_APDU_presentResponse)
745  {
746  Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
747  f_resp->records = p_res->records;
748  *f_resp->numberOfRecordsReturned =
749  *p_res->numberOfRecordsReturned;
750  *f_resp->nextResultSetPosition =
751  *p_res->nextResultSetPosition;
752  }
753  else
754  {
755  package.response() = pp.response();
756  return;
757  }
758  }
759  package.response() = f_apdu; // in this scope because of p
760 }
761 
762 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
763 {
764  // create present request
765  Z_PresentRequest *req = apdu_req->u.presentRequest;
766 
767  Sets_it it;
768  it = m_sets.find(std::string(req->resultSetId));
769  if (it == m_sets.end())
770  {
771  mp::odr odr;
772  Z_APDU *apdu =
773  odr.create_presentResponse(
774  apdu_req,
775  YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
776  req->resultSetId);
777  package.response() = apdu;
778  return;
779  }
780  std::list<Multi::FrontendSet::PresentJob> jobs;
781  int start = *req->resultSetStartPoint;
782  int number = *req->numberOfRecordsRequested;
783 
784  if (m_p->m_merge_type == round_robin)
785  it->second.round_robin(start, number, jobs);
786  else if (m_p->m_merge_type == serve_order)
787  it->second.serve_order(start, number, jobs);
788 
789  if (0)
790  {
791  std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
792  for (jit = jobs.begin(); jit != jobs.end(); jit++)
793  {
794  yaz_log(YLOG_DEBUG, "job pos=%d", jit->m_pos);
795  }
796  }
797 
798  std::list<BackendPtr> present_backend_list;
799 
800  std::list<BackendSet>::const_iterator bsit;
801  bsit = it->second.m_backend_sets.begin();
802  for (; bsit != it->second.m_backend_sets.end(); bsit++)
803  {
804  int start = -1;
805  int end = -1;
806  {
807  std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
808  for (jit = jobs.begin(); jit != jobs.end(); jit++)
809  {
810  if (jit->m_backend == bsit->m_backend)
811  {
812  if (start == -1 || jit->m_pos < start)
813  start = jit->m_pos;
814  if (end == -1 || jit->m_pos > end)
815  end = jit->m_pos;
816  }
817  }
818  }
819  if (start != -1)
820  {
821  std::list<Multi::FrontendSet::PresentJob>::iterator jit;
822  for (jit = jobs.begin(); jit != jobs.end(); jit++)
823  {
824  if (jit->m_backend == bsit->m_backend)
825  {
826  if (jit->m_pos >= start && jit->m_pos <= end)
827  jit->m_start = start;
828  }
829  }
830 
831  PackagePtr p = bsit->m_backend->m_package;
832 
833  *req->resultSetStartPoint = start;
834  *req->numberOfRecordsRequested = end - start + 1;
835 
836  p->request() = apdu_req;
837  p->copy_filter(package);
838 
839  present_backend_list.push_back(bsit->m_backend);
840  }
841  }
842  multi_move(present_backend_list);
843 
844  // look at each response
845  Z_DiagRecs *z_diag = 0;
846  int no_successful = 0;
847  mp::odr odr;
848  PackagePtr close_p;
849 
850  std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
851  for (; pbit != present_backend_list.end(); pbit++)
852  {
853  PackagePtr p = (*pbit)->m_package;
854 
855  // save closing package for at least one target
856  if (p->session().is_closed())
857  close_p = p;
858 
859  Z_GDU *gdu = p->response().get();
860  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
861  Z_APDU_presentResponse)
862  {
863  Z_APDU *b_apdu = gdu->u.z3950;
864  Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
865 
866  record_diagnostics(b_resp->records, z_diag, odr, no_successful);
867  }
868  }
869 
870  Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
871  Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
872 
873  if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
874  {
875  package.session().close();
876  package.response() = close_p->response();
877  return;
878  }
879  if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
880  {
881  f_resp->records = (Z_Records *)
882  odr_malloc(odr, sizeof(*f_resp->records));
883  if (z_diag->num_diagRecs > 1)
884  {
885  f_resp->records->which = Z_Records_multipleNSD;
886  f_resp->records->u.multipleNonSurDiagnostics = z_diag;
887  }
888  else
889  {
890  f_resp->records->which = Z_Records_NSD;
891  f_resp->records->u.nonSurrogateDiagnostic =
892  z_diag->diagRecs[0]->u.defaultFormat;
893  }
894  }
895  else if (number < 0 || (size_t) number > jobs.size())
896  {
897  f_apdu =
898  odr.create_presentResponse(
899  apdu_req,
900  YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
901  0);
902  }
903  else
904  {
905  f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
906  Z_Records * records = f_resp->records;
907  records->which = Z_Records_DBOSD;
908  records->u.databaseOrSurDiagnostics =
909  (Z_NamePlusRecordList *)
910  odr_malloc(odr, sizeof(Z_NamePlusRecordList));
911  Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
912  nprl->num_records = jobs.size();
913  nprl->records = (Z_NamePlusRecord**)
914  odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
915  int i = 0;
916  std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
917  for (jit = jobs.begin(); jit != jobs.end(); jit++)
918  {
919  PackagePtr p = jit->m_backend->m_package;
920 
921  Z_GDU *gdu = p->response().get();
922  Z_APDU *b_apdu = gdu->u.z3950;
923  int inside_pos = jit->m_pos - jit->m_start;
924  Z_Records *records = b_apdu->u.presentResponse->records;
925 
926  if (records && records->which == Z_Records_DBOSD
927  && inside_pos <
928  records->u.databaseOrSurDiagnostics->num_records)
929  {
930  nprl->records[i] = (Z_NamePlusRecord*)
931  odr_malloc(odr, sizeof(Z_NamePlusRecord));
932  *nprl->records[i] = *records->
933  u.databaseOrSurDiagnostics->records[inside_pos];
934  nprl->records[i]->databaseName =
935  odr_strdup(odr, jit->m_backend->m_vhost.c_str());
936  i++;
937  }
938  }
939  nprl->num_records = i; // usually same as jobs.size();
940  *f_resp->nextResultSetPosition = start + i;
941  *f_resp->numberOfRecordsReturned = i;
942  }
943  package.response() = f_apdu;
944 }
945 
946 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
947 {
948  return m_norm_term < k.m_norm_term;
949 }
950 
951 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
952 {
953  return m_norm_term == k.m_norm_term;
954 }
955 
956 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
957 {
958  Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
959  e->which = Z_Entry_termInfo;
960  Z_TermInfo *t;
961  t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
962  t->suggestedAttributes = 0;
963  t->displayTerm = 0;
964  t->alternativeTerm = 0;
965  t->byAttributes = 0;
966  t->otherTermInfo = 0;
967  t->globalOccurrences = odr_intdup(odr, m_count);
968  t->term = (Z_Term *) odr_malloc(odr, sizeof(*t->term));
969  t->term->which = Z_Term_general;
970  t->term->u.general = odr_create_Odr_oct(odr,
971  m_norm_term.c_str(), m_norm_term.size());
972  return e;
973 }
974 
975 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
976 {
977  std::list<BackendPtr>::const_iterator bit;
978  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
979  {
980  PackagePtr p = (*bit)->m_package;
981  mp::odr odr;
982 
983  p->request() = apdu_req;
984  p->copy_filter(package);
985  }
986  multi_move(m_backend_list);
987  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
988  {
989  PackagePtr p = (*bit)->m_package;
990 
991  if (p->session().is_closed()) // if any backend closes, close frontend
992  package.session().close();
993 
994  package.response() = p->response();
995  }
996 }
997 
998 
999 void yf::Multi::Frontend::scan(mp::Package &package, Z_APDU *apdu_req)
1000 {
1001  Z_ScanRequest *req = apdu_req->u.scanRequest;
1002 
1003  std::list<BackendPtr>::const_iterator bit;
1004  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1005  {
1006  PackagePtr p = (*bit)->m_package;
1007  yazpp_1::GDU gdu1(apdu_req);
1008  mp::odr odr;
1009  Z_ScanRequest *req1 = gdu1.get()->u.z3950->u.scanRequest;
1010 
1011  if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1012  &req1->num_databaseNames,
1013  &req1->databaseNames))
1014  {
1015  req1->num_databaseNames = req->num_databaseNames;
1016  req1->databaseNames = req->databaseNames;
1017  }
1018  p->request() = gdu1;
1019  p->copy_filter(package);
1020  }
1021  multi_move(m_backend_list);
1022 
1023  ScanTermInfoList entries_before;
1024  ScanTermInfoList entries_after;
1025  int no_before = 0;
1026  int no_after = 0;
1027 
1028  for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1029  {
1030  PackagePtr p = (*bit)->m_package;
1031 
1032  if (p->session().is_closed()) // if any backend closes, close frontend
1033  package.session().close();
1034 
1035  Z_GDU *gdu = p->response().get();
1036  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1037  Z_APDU_scanResponse)
1038  {
1039  Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1040 
1041  if (res->entries && res->entries->nonsurrogateDiagnostics)
1042  {
1043  // failure
1044  mp::odr odr;
1045  Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1046  Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1047 
1048  f_res->entries->nonsurrogateDiagnostics =
1049  res->entries->nonsurrogateDiagnostics;
1050  f_res->entries->num_nonsurrogateDiagnostics =
1051  res->entries->num_nonsurrogateDiagnostics;
1052 
1053  package.response() = f_apdu;
1054  return;
1055  }
1056 
1057  if (res->entries && res->entries->entries)
1058  {
1059  Z_Entry **entries = res->entries->entries;
1060  int num_entries = res->entries->num_entries;
1061  int position = 1;
1062  if (req->preferredPositionInResponse)
1063  position = *req->preferredPositionInResponse;
1064  if (res->positionOfTerm)
1065  position = *res->positionOfTerm;
1066 
1067  // before
1068  int i;
1069  for (i = 0; i<position-1 && i<num_entries; i++)
1070  {
1071  Z_Entry *ent = entries[i];
1072 
1073  if (ent->which == Z_Entry_termInfo)
1074  {
1075  ScanTermInfo my;
1076 
1077  Odr_int *occur = ent->u.termInfo->globalOccurrences;
1078  my.m_count = occur ? *occur : 0;
1079 
1080  if (ent->u.termInfo->term->which == Z_Term_general)
1081  {
1082  my.m_norm_term = std::string(
1083  (const char *)
1084  ent->u.termInfo->term->u.general->buf,
1085  ent->u.termInfo->term->u.general->len);
1086  }
1087  if (my.m_norm_term.length())
1088  {
1089  ScanTermInfoList::iterator it =
1090  entries_before.begin();
1091  while (it != entries_before.end() && my <*it)
1092  it++;
1093  if (it != entries_before.end() && my == *it)
1094  {
1095  it->m_count += my.m_count;
1096  }
1097  else
1098  {
1099  entries_before.insert(it, my);
1100  no_before++;
1101  }
1102  }
1103  }
1104  }
1105  // after
1106  if (position <= 0)
1107  i = 0;
1108  else
1109  i = position-1;
1110  for ( ; i<num_entries; i++)
1111  {
1112  Z_Entry *ent = entries[i];
1113 
1114  if (ent->which == Z_Entry_termInfo)
1115  {
1116  ScanTermInfo my;
1117 
1118  Odr_int *occur = ent->u.termInfo->globalOccurrences;
1119  my.m_count = occur ? *occur : 0;
1120 
1121  if (ent->u.termInfo->term->which == Z_Term_general)
1122  {
1123  my.m_norm_term = std::string(
1124  (const char *)
1125  ent->u.termInfo->term->u.general->buf,
1126  ent->u.termInfo->term->u.general->len);
1127  }
1128  if (my.m_norm_term.length())
1129  {
1130  ScanTermInfoList::iterator it =
1131  entries_after.begin();
1132  while (it != entries_after.end() && *it < my)
1133  it++;
1134  if (it != entries_after.end() && my == *it)
1135  {
1136  it->m_count += my.m_count;
1137  }
1138  else
1139  {
1140  entries_after.insert(it, my);
1141  no_after++;
1142  }
1143  }
1144  }
1145  }
1146 
1147  }
1148  }
1149  else
1150  {
1151  // if any target does not return scan response - return that
1152  package.response() = p->response();
1153  return;
1154  }
1155  }
1156 
1157  if (false)
1158  {
1159  std::cout << "BEFORE\n";
1160  ScanTermInfoList::iterator it = entries_before.begin();
1161  for(; it != entries_before.end(); it++)
1162  {
1163  std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1164  }
1165 
1166  std::cout << "AFTER\n";
1167  it = entries_after.begin();
1168  for(; it != entries_after.end(); it++)
1169  {
1170  std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1171  }
1172  }
1173 
1174  if (false)
1175  {
1176  mp::odr odr;
1177  Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1178  package.response() = f_apdu;
1179  }
1180  else
1181  {
1182  mp::odr odr;
1183  Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1184  Z_ScanResponse *resp = f_apdu->u.scanResponse;
1185 
1186  int number_returned = *req->numberOfTermsRequested;
1187  int position_returned = *req->preferredPositionInResponse;
1188 
1189  resp->entries->num_entries = number_returned;
1190  resp->entries->entries = (Z_Entry**)
1191  odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1192  int i;
1193 
1194  int lbefore = entries_before.size();
1195  if (lbefore < position_returned-1)
1196  position_returned = lbefore+1;
1197 
1198  ScanTermInfoList::iterator it = entries_before.begin();
1199  for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1200  {
1201  resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1202  }
1203 
1204  it = entries_after.begin();
1205 
1206  if (position_returned <= 0)
1207  i = 0;
1208  else
1209  i = position_returned-1;
1210  for (; i<number_returned && it != entries_after.end(); i++, it++)
1211  {
1212  resp->entries->entries[i] = it->get_entry(odr);
1213  }
1214 
1215  number_returned = i;
1216 
1217  resp->positionOfTerm = odr_intdup(odr, position_returned);
1218  resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1219  resp->entries->num_entries = number_returned;
1220 
1221  package.response() = f_apdu;
1222  }
1223 }
1224 
1225 
1226 void yf::Multi::process(mp::Package &package) const
1227 {
1228  FrontendPtr f = m_p->get_frontend(package);
1229 
1230  Z_GDU *gdu = package.request().get();
1231 
1232  if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1233  Z_APDU_initRequest && !f->m_is_multi)
1234  {
1235  f->init(package, gdu);
1236  }
1237  else if (!f->m_is_multi)
1238  package.move();
1239  else if (gdu && gdu->which == Z_GDU_Z3950)
1240  {
1241  Z_APDU *apdu = gdu->u.z3950;
1242  if (apdu->which == Z_APDU_initRequest)
1243  {
1244  mp::odr odr;
1245 
1246  package.response() = odr.create_close(
1247  apdu,
1248  Z_Close_protocolError,
1249  "double init");
1250 
1251  package.session().close();
1252  }
1253  else if (apdu->which == Z_APDU_searchRequest)
1254  {
1255  f->search(package, apdu);
1256  }
1257  else if (apdu->which == Z_APDU_presentRequest)
1258  {
1259  f->present(package, apdu);
1260  }
1261  else if (apdu->which == Z_APDU_scanRequest)
1262  {
1263  f->scan(package, apdu);
1264  }
1265  else if (apdu->which == Z_APDU_close)
1266  {
1267  f->relay_apdu(package, apdu);
1268  }
1269  else
1270  {
1271  mp::odr odr;
1272 
1273  package.response() = odr.create_close(
1274  apdu, Z_Close_protocolError,
1275  "unsupported APDU in filter multi");
1276 
1277  package.session().close();
1278  }
1279  }
1280  m_p->release_frontend(package);
1281 }
1282 
1283 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1284  const char *path)
1285 {
1286  for (ptr = ptr->children; ptr; ptr = ptr->next)
1287  {
1288  if (ptr->type != XML_ELEMENT_NODE)
1289  continue;
1290  if (!strcmp((const char *) ptr->name, "target"))
1291  {
1292  std::string auth;
1293  std::string route = mp::xml::get_route(ptr, auth);
1294  std::string target = mp::xml::get_text(ptr);
1295  if (target.length() == 0)
1296  target = route;
1297  m_p->m_route_patterns.push_back(Multi::Map(target, route, auth));
1298  }
1299  else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1300  {
1301  m_p->m_hide_unavailable = true;
1302  }
1303  else if (!strcmp((const char *) ptr->name, "hideerrors"))
1304  {
1305  m_p->m_hide_errors = true;
1306  }
1307  else if (!strcmp((const char *) ptr->name, "mergetype"))
1308  {
1309  std::string mergetype = mp::xml::get_text(ptr);
1310  if (mergetype == "roundrobin")
1311  m_p->m_merge_type = round_robin;
1312  else if (mergetype == "serveorder")
1313  m_p->m_merge_type = serve_order;
1314  else
1315  throw mp::filter::FilterException
1316  ("Bad mergetype " + mergetype + " in multi filter");
1317 
1318  }
1319  else
1320  {
1321  throw mp::filter::FilterException
1322  ("Bad element "
1323  + std::string((const char *) ptr->name)
1324  + " in multi filter");
1325  }
1326  }
1327 }
1328 
1329 static mp::filter::Base* filter_creator()
1330 {
1331  return new mp::filter::Multi;
1332 }
1333 
1334 extern "C" {
1335  struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1336  0,
1337  "multi",
1339  };
1340 }
1341 
1342 
1343 /*
1344  * Local variables:
1345  * c-basic-offset: 4
1346  * c-file-style: "Stroustrup"
1347  * indent-tabs-mode: nil
1348  * End:
1349  * vim: shiftwidth=4 tabstop=8 expandtab
1350  */
1351 
Map(std::string pattern, std::string route, std::string auth)
bool match(const std::string target, std::string *ret, std::string *auth) const
std::map< mp::Session, FrontendPtr > m_clients
FrontendPtr get_frontend(Package &package)
std::list< Multi::Map > m_route_patterns
boost::condition m_cond_session_ready
void release_frontend(Package &package)
boost::shared_ptr< Frontend > FrontendPtr
boost::shared_ptr< Backend > BackendPtr
std::list< ScanTermInfo > ScanTermInfoList
std::map< std::string, FrontendSet >::iterator Sets_it
boost::scoped_ptr< Rep > m_p
boost::shared_ptr< Package > PackagePtr
static const int result_set_size
struct metaproxy_1_filter_struct metaproxy_1_filter_multi
static mp::filter::Base * filter_creator()
bool operator==(const BackendSet &k) const
bool operator<(const BackendSet &k) const
void serve_order(int pos, int number, std::list< PresentJob > &job)
void round_robin(int pos, int number, std::list< PresentJob > &job)
std::list< BackendSet > m_backend_sets
void init(Package &package, Z_GDU *gdu)
void record_diagnostics(Z_Records *records, Z_DiagRecs *&z_diag, ODR odr, int &no_successful)
void present(Package &package, Z_APDU *apdu)
void relay_apdu(Package &package, Z_APDU *apdu)
void scan(Package &package, Z_APDU *apdu)
void search(Package &package, Z_APDU *apdu)
std::list< BackendPtr > m_backend_list
void multi_move(std::list< BackendPtr > &blist)
std::map< std::string, Multi::FrontendSet > m_sets
bool operator<(const ScanTermInfo &) const
bool operator==(const ScanTermInfo &) const