metaproxy 1.22.1
filter_multi.cpp
Go to the documentation of this file.
1/* This file is part of Metaproxy.
2 Copyright (C) Index Data
3
4Metaproxy is free software; you can redistribute it and/or modify it under
5the terms of the GNU General Public License as published by the Free
6Software Foundation; either version 2, or (at your option) any later
7version.
8
9Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10WARRANTY; without even the implied warranty of MERCHANTABILITY or
11FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12for more details.
13
14You should have received a copy of the GNU General Public License
15along with this program; if not, write to the Free Software
16Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17*/
18
19#include <yaz/log.h>
20
21#include "config.hpp"
22
23#include <metaproxy/filter.hpp>
24#include <metaproxy/package.hpp>
25
26#include <boost/thread/thread.hpp>
27#include <boost/thread/mutex.hpp>
28#include <boost/thread/condition.hpp>
29#include <boost/shared_ptr.hpp>
30
31#include <metaproxy/util.hpp>
32#include "filter_multi.hpp"
33
34#include <yaz/zgdu.h>
35#include <yaz/otherinfo.h>
36#include <yaz/diagbib1.h>
37#include <yaz/match_glob.h>
38#include <yaz/oid_db.h>
39
40#include <vector>
41#include <algorithm>
42#include <map>
43#include <iostream>
44
45namespace mp = metaproxy_1;
46namespace yf = mp::filter;
47
48namespace metaproxy_1 {
49 namespace filter {
57 bool operator < (const BackendSet &k) const;
58 bool operator == (const BackendSet &k) const;
59 };
61 std::string m_norm_term;
62 std::string m_display_term;
64 bool operator < (const ScanTermInfo &) const;
65 bool operator == (const ScanTermInfo &) const;
66 Z_Entry *get_entry(ODR odr);
67 };
69 class PresentJob {
70 public:
72 int m_pos; // position for backend (1=first, 2=second,..
73 int m_start; // present request start
74 PresentJob(BackendPtr ptr, int pos) :
75 m_backend(ptr), m_pos(pos), m_start(0) {};
76 };
77 FrontendSet(std::string setname);
80
81 void round_robin(int pos, int number, std::list<PresentJob> &job);
82 void serve_order(int pos, int number, std::list<PresentJob> &job);
83
84 std::list<BackendSet> m_backend_sets;
85 std::string m_setname;
86 };
89 std::string m_backend_database;
90 std::string m_vhost;
91 std::string m_route;
92 std::string m_auth;
93 void operator() (void); // thread operation
94 };
96 Frontend(Rep *rep);
97 ~Frontend();
100 std::list<BackendPtr> m_backend_list;
101 std::map<std::string,Multi::FrontendSet> m_sets;
102
103 void multi_move(std::list<BackendPtr> &blist);
104 void init(Package &package, Z_GDU *gdu);
105 void close(Package &package);
106 void search(Package &package, Z_APDU *apdu);
107 void present(Package &package, Z_APDU *apdu);
108 void scan(Package &package, Z_APDU *apdu);
109 void relay_apdu(Package &package, Z_APDU *apdu);
110 void record_diagnostics(Z_Records *records,
111 Z_DiagRecs * &z_diag,
112 ODR odr,
113 int &no_successful);
115 };
117 std::string m_target_pattern;
118 std::string m_route;
119 std::string m_auth;
120 public:
121 Map(std::string pattern, std::string route, std::string auth) :
122 m_target_pattern(pattern), m_route(route), m_auth(auth) {};
123 bool match(const std::string target, std::string *ret,
124 std::string *auth) const {
125 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
126 {
127 *ret = m_route;
128 *auth = m_auth;
129 return true;
130 }
131 return false;
132 };
133 };
135 friend class Multi;
136 friend struct Frontend;
137
138 Rep();
139 FrontendPtr get_frontend(Package &package);
140 void release_frontend(Package &package);
141 private:
142 std::list<Multi::Map> m_route_patterns;
143 boost::mutex m_mutex;
144 boost::condition m_cond_session_ready;
145 std::map<mp::Session, FrontendPtr> m_clients;
149 };
150 }
151}
152
153yf::Multi::Rep::Rep()
154{
155 m_hide_unavailable = false;
156 m_hide_errors = false;
157 m_merge_type = round_robin;
158}
159
160bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
161{
162 return m_count < k.m_count;
163}
164
165yf::Multi::Frontend::Frontend(Rep *rep)
166{
167 m_p = rep;
168 m_is_multi = false;
169}
170
171yf::Multi::Frontend::~Frontend()
172{
173}
174
175yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
176{
177 boost::mutex::scoped_lock lock(m_mutex);
178
179 std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
180
181 while(true)
182 {
183 it = m_clients.find(package.session());
184 if (it == m_clients.end())
185 break;
186
187 if (!it->second->m_in_use)
188 {
189 it->second->m_in_use = true;
190 return it->second;
191 }
192 m_cond_session_ready.wait(lock);
193 }
194 FrontendPtr f(new Frontend(this));
195 m_clients[package.session()] = f;
196 f->m_in_use = true;
197 return f;
198}
199
200void yf::Multi::Rep::release_frontend(mp::Package &package)
201{
202 boost::mutex::scoped_lock lock(m_mutex);
203 std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
204
205 it = m_clients.find(package.session());
206 if (it != m_clients.end())
207 {
208 if (package.session().is_closed())
209 {
210 it->second->close(package);
211 m_clients.erase(it);
212 }
213 else
214 {
215 it->second->m_in_use = false;
216 }
217 m_cond_session_ready.notify_all();
218 }
219}
220
221yf::Multi::FrontendSet::FrontendSet(std::string setname)
222 : m_setname(setname)
223{
224}
225
226
227yf::Multi::FrontendSet::FrontendSet()
228{
229}
230
231
232yf::Multi::FrontendSet::~FrontendSet()
233{
234}
235
236yf::Multi::Multi() : m_p(new Multi::Rep)
237{
238}
239
240yf::Multi::~Multi() {
241}
242
243
244void yf::Multi::Backend::operator() (void)
245{
246 m_package->move(m_route);
247}
248
249
250void yf::Multi::Frontend::close(mp::Package &package)
251{
252 std::list<BackendPtr>::const_iterator bit;
253 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
254 {
255 BackendPtr b = *bit;
256
257 b->m_package->copy_filter(package);
258 b->m_package->request() = (Z_GDU *) 0;
259 b->m_package->session().close();
260 b->m_package->move(b->m_route);
261 }
262}
263
264void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
265{
266 std::list<BackendPtr>::const_iterator bit;
267 boost::thread_group g;
268 for (bit = blist.begin(); bit != blist.end(); bit++)
269 {
270 g.add_thread(new boost::thread(**bit));
271 }
272 g.join_all();
273}
274
275void yf::Multi::FrontendSet::serve_order(int start, int number,
276 std::list<PresentJob> &jobs)
277{
278 int i;
279 for (i = 0; i < number; i++)
280 {
281 std::list<BackendSet>::const_iterator bsit;
282 int voffset = 0;
283 int offset = start + i - 1;
284 for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end();
285 bsit++)
286 {
287 if (offset >= voffset && offset < voffset + bsit->m_count)
288 {
289 PresentJob job(bsit->m_backend, offset - voffset + 1);
290 jobs.push_back(job);
291 break;
292 }
293 voffset += bsit->m_count;
294 }
295 }
296}
297
298void yf::Multi::FrontendSet::round_robin(int start, int number,
299 std::list<PresentJob> &jobs)
300{
301 std::list<int> pos;
302 std::list<BackendSet>::const_iterator bsit;
303 for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
304 {
305 pos.push_back(1);
306 }
307
308 int p = 1;
309#if 1
310 // optimization step!
311 int omin = 0;
312 while(true)
313 {
314 int min = 0;
315 int no_left = 0;
316 // find min count for each set which is > omin
317 for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
318 {
319 if (bsit->m_count > omin)
320 {
321 if (no_left == 0 || bsit->m_count < min)
322 min = bsit->m_count;
323 no_left++;
324 }
325 }
326 if (no_left == 0) // if nothing greater than omin, bail out.
327 break;
328 int skip = no_left * min;
329 if (p + skip > start) // step gets us "into" present range?
330 {
331 // Yes. skip until start.. Rounding off is deliberate!
332 min = (start-p) / no_left;
333 p += no_left * min;
334
335 // update positions in each set..
336 std::list<int>::iterator psit = pos.begin();
337 for (psit = pos.begin(); psit != pos.end(); psit++)
338 *psit += min;
339 break;
340 }
341 // skip on each set.. before "present range"..
342 p = p + skip;
343
344 std::list<int>::iterator psit = pos.begin();
345 for (psit = pos.begin(); psit != pos.end(); psit++)
346 *psit += min;
347
348 omin = min; // update so we consider next class (with higher count)
349 }
350#endif
351 int fetched = 0;
352 bool more = true;
353 while (more)
354 {
355 more = false;
356 std::list<int>::iterator psit = pos.begin();
357 bsit = m_backend_sets.begin();
358
359 for (; bsit != m_backend_sets.end(); psit++,bsit++)
360 {
361 if (fetched >= number)
362 {
363 more = false;
364 break;
365 }
366 if (*psit <= bsit->m_count)
367 {
368 if (p >= start)
369 {
370 PresentJob job(bsit->m_backend, *psit);
371 jobs.push_back(job);
372 fetched++;
373 }
374 (*psit)++;
375 p++;
376 more = true;
377 }
378 }
379 }
380}
381
382void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
383{
384 Z_InitRequest *req = gdu->u.z3950->u.initRequest;
385
386 std::list<std::string> targets;
387
388 mp::util::get_vhost_otherinfo(req->otherInfo, targets);
389
390 if (targets.size() < 1)
391 {
392 package.move();
393 return;
394 }
395
396 std::list<std::string>::const_iterator t_it = targets.begin();
397 for (; t_it != targets.end(); t_it++)
398 {
399 Session s;
400 Backend *b = new Backend;
401 b->m_vhost = *t_it;
402
403 std::list<Multi::Map>::const_iterator it =
404 m_p->m_route_patterns.begin();
405 while (it != m_p->m_route_patterns.end()) {
406 if (it->match(*t_it, &b->m_route, &b->m_auth))
407 break;
408 it++;
409 }
410 // b->m_route = m_p->m_target_route[*t_it];
411 // b->m_route unset
412 b->m_package = PackagePtr(new Package(s, package.origin()));
413
414 m_backend_list.push_back(BackendPtr(b));
415 }
416 m_is_multi = true;
417
418 // create init request
419 std::list<BackendPtr>::iterator bit;
420 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
421 {
422 mp::odr odr;
423 BackendPtr b = *bit;
424 Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
425
426 std::list<std::string>vhost_one;
427 vhost_one.push_back(b->m_vhost);
428 mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
429 odr, vhost_one);
430
431
432 Z_InitRequest *breq = init_apdu->u.initRequest;
433
434 if (b->m_auth.length())
435 {
436 breq->idAuthentication =
437 (Z_IdAuthentication *)
438 odr_malloc(odr, sizeof(*breq->idAuthentication));
439 breq->idAuthentication->which = Z_IdAuthentication_open;
440 breq->idAuthentication->u.open = odr_strdup(odr, b->m_auth.c_str());
441 }
442 else
443 breq->idAuthentication = req->idAuthentication;
444
445 *breq->preferredMessageSize = *req->preferredMessageSize;
446 *breq->maximumRecordSize = *req->maximumRecordSize;
447
448
449 const char *peer_name = yaz_oi_get_string_oid(
450 &req->otherInfo, yaz_oid_userinfo_client_ip, 1, 0);
451 if (peer_name)
452 yaz_oi_set_string_oid(&breq->otherInfo, odr,
453 yaz_oid_userinfo_client_ip, 1, peer_name);
454
455 ODR_MASK_SET(breq->options, Z_Options_search);
456 ODR_MASK_SET(breq->options, Z_Options_present);
457 ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
458 ODR_MASK_SET(breq->options, Z_Options_scan);
459
460 ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
461 ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
462 ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
463
464 b->m_package->request() = init_apdu;
465
466 b->m_package->copy_filter(package);
467 }
468 multi_move(m_backend_list);
469
470 // create the frontend init response based on each backend init response
471 mp::odr odr;
472
473 Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
474 Z_InitResponse *f_resp = f_apdu->u.initResponse;
475
476 ODR_MASK_SET(f_resp->options, Z_Options_search);
477 ODR_MASK_SET(f_resp->options, Z_Options_present);
478 ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
479 ODR_MASK_SET(f_resp->options, Z_Options_scan);
480
481 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
482 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
483 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
484
485 int no_failed = 0;
486 int no_succeeded = 0;
487
488 Odr_int preferredMessageSize = *req->preferredMessageSize;
489 Odr_int maximumRecordSize = *req->maximumRecordSize;
490 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
491 {
492 PackagePtr p = (*bit)->m_package;
493
494 if (p->session().is_closed())
495 {
496 // failed. Remove from list and increment number of failed
497 no_failed++;
498 bit = m_backend_list.erase(bit);
499 continue;
500 }
501 Z_GDU *gdu = p->response().get();
502 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
503 Z_APDU_initResponse)
504 {
505 int i;
506 Z_APDU *b_apdu = gdu->u.z3950;
507 Z_InitResponse *b_resp = b_apdu->u.initResponse;
508
509 // common options for all backends
510 for (i = 0; i <= Z_Options_stringSchema; i++)
511 {
512 if (!ODR_MASK_GET(b_resp->options, i))
513 ODR_MASK_CLEAR(f_resp->options, i);
514 }
515 // common protocol version
516 for (i = 0; i <= Z_ProtocolVersion_3; i++)
517 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
518 ODR_MASK_CLEAR(f_resp->protocolVersion, i);
519 if (*b_resp->result)
520 {
521 no_succeeded++;
522 if (preferredMessageSize > *b_resp->preferredMessageSize)
523 preferredMessageSize = *b_resp->preferredMessageSize;
524 if (maximumRecordSize > *b_resp->maximumRecordSize)
525 maximumRecordSize = *b_resp->maximumRecordSize;
526 }
527 else
528 {
529 if (!f_resp->userInformationField
530 && b_resp->userInformationField)
531 f_resp->userInformationField = b_resp->userInformationField;
532 no_failed++;
533 }
534 }
535 else
536 no_failed++;
537 bit++;
538 }
539 *f_resp->preferredMessageSize = preferredMessageSize;
540 *f_resp->maximumRecordSize = maximumRecordSize;
541
542 if (m_p->m_hide_unavailable)
543 {
544 if (no_succeeded == 0)
545 {
546 *f_resp->result = 0;
547 package.session().close();
548 }
549 }
550 else
551 {
552 if (no_failed)
553 {
554 *f_resp->result = 0;
555 package.session().close();
556 }
557 }
558 package.response() = f_apdu;
559}
560
561void yf::Multi::Frontend::record_diagnostics(Z_Records *records,
562 Z_DiagRecs * &z_diag,
563 ODR odr,
564 int &no_successful)
565{
566 // see we get any errors (AKA diagnstics)
567 if (records)
568 {
569 if (records->which == Z_Records_NSD)
570 {
571 if (!z_diag)
572 {
573 z_diag = (Z_DiagRecs *)
574 odr_malloc(odr, sizeof(*z_diag));
575 z_diag->num_diagRecs = 0;
576 z_diag->diagRecs = (Z_DiagRec**)
577 odr_malloc(odr, sizeof(*z_diag->diagRecs));
578 }
579 else
580 {
581 Z_DiagRec **n = (Z_DiagRec **)
582 odr_malloc(odr,
583 (1 + z_diag->num_diagRecs) * sizeof(*n));
584 memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
585 * sizeof(*n));
586 z_diag->diagRecs = n;
587 }
588 Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
589 nr->which = Z_DiagRec_defaultFormat;
590 nr->u.defaultFormat =
591 records->u.nonSurrogateDiagnostic;
592 z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
593 }
594 else if (records->which == Z_Records_multipleNSD)
595 {
596 Z_DiagRecs * dr =records->u.multipleNonSurDiagnostics;
597
598 if (!z_diag)
599 {
600 z_diag = (Z_DiagRecs *) odr_malloc(odr, sizeof(*z_diag));
601 z_diag->num_diagRecs = 0;
602 z_diag->diagRecs = 0;
603 }
604 Z_DiagRec **n = (Z_DiagRec **)
605 odr_malloc(odr,
606 (dr->num_diagRecs + z_diag->num_diagRecs) *
607 sizeof(*n));
608 if (z_diag->num_diagRecs)
609 memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs * sizeof(*n));
610 memcpy(n + z_diag->num_diagRecs,
611 dr->diagRecs, dr->num_diagRecs * sizeof(*n));
612 z_diag->diagRecs = n;
613 z_diag->num_diagRecs += dr->num_diagRecs;
614 }
615 else
616 no_successful++; // probably piggyback
617 }
618 else
619 no_successful++; // no records and no diagnostics
620}
621
622void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
623{
624 // create search request
625 Z_SearchRequest *req = apdu_req->u.searchRequest;
626
627 std::list<BackendPtr>::const_iterator bit;
628 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
629 {
630 PackagePtr p = (*bit)->m_package;
631 yazpp_1::GDU gdu1(apdu_req);
632 mp::odr odr;
633 Z_SearchRequest *req1 = gdu1.get()->u.z3950->u.searchRequest;
634
635 // they are altered now - to disable piggyback
636 *req1->smallSetUpperBound = 0;
637 *req1->largeSetLowerBound = 1;
638 *req1->mediumSetPresentNumber = 0;
639
640 if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
641 &req1->num_databaseNames,
642 &req1->databaseNames))
643 {
644 req1->num_databaseNames = req->num_databaseNames;
645 req1->databaseNames = req->databaseNames;
646 }
647 p->request() = gdu1;
648 p->copy_filter(package);
649 }
650 multi_move(m_backend_list);
651
652 // look at each response
653 FrontendSet resultSet(std::string(req->resultSetName));
654
655 mp::odr odr;
656 Odr_int result_set_size = 0;
657 Z_DiagRecs *z_diag = 0;
658 int no_successful = 0;
659 PackagePtr close_p;
660
661 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
662 {
663 PackagePtr p = (*bit)->m_package;
664
665 // save closing package for at least one target
666 if (p->session().is_closed())
667 close_p = p;
668
669 Z_GDU *gdu = p->response().get();
670 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
671 Z_APDU_searchResponse)
672 {
673 Z_APDU *b_apdu = gdu->u.z3950;
674 Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
675
676 record_diagnostics(b_resp->records, z_diag, odr, no_successful);
677
678 BackendSet backendSet;
679 backendSet.m_backend = *bit;
680 backendSet.m_count = *b_resp->resultCount;
681 result_set_size += *b_resp->resultCount;
682 resultSet.m_backend_sets.push_back(backendSet);
683 }
684 }
685
686 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
687 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
688
689 yaz_log(YLOG_DEBUG, "no_successful=%d is_closed=%s hide_errors=%s",
690 no_successful,
691 close_p ? "true" : "false",
692 m_p->m_hide_errors ? "true" : "false");
693 *f_resp->resultCount = result_set_size;
694 if (close_p && (no_successful == 0 || !m_p->m_hide_unavailable))
695 {
696 package.session().close();
697 package.response() = close_p->response();
698 return;
699 }
700 if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
701 {
702 f_resp->records = (Z_Records *)
703 odr_malloc(odr, sizeof(*f_resp->records));
704 if (z_diag->num_diagRecs > 1)
705 {
706 f_resp->records->which = Z_Records_multipleNSD;
707 f_resp->records->u.multipleNonSurDiagnostics = z_diag;
708 }
709 else
710 {
711 f_resp->records->which = Z_Records_NSD;
712 f_resp->records->u.nonSurrogateDiagnostic =
713 z_diag->diagRecs[0]->u.defaultFormat;
714 }
715 }
716 // assume OK
717 m_sets[resultSet.m_setname] = resultSet;
718
719 Odr_int number;
720 mp::util::piggyback(*req->smallSetUpperBound,
721 *req->largeSetLowerBound,
722 *req->mediumSetPresentNumber,
723 0, 0,
725 number, 0);
726 Package pp(package.session(), package.origin());
727 if (z_diag == 0 && number > 0)
728 {
729 pp.copy_filter(package);
730 Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
731 Z_PresentRequest *p_req = p_apdu->u.presentRequest;
732 p_req->preferredRecordSyntax = req->preferredRecordSyntax;
733 p_req->resultSetId = req->resultSetName;
734 *p_req->resultSetStartPoint = 1;
735 *p_req->numberOfRecordsRequested = number;
736 pp.request() = p_apdu;
737 present(pp, p_apdu);
738
739 if (pp.session().is_closed())
740 package.session().close();
741
742 Z_GDU *gdu = pp.response().get();
743 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
744 Z_APDU_presentResponse)
745 {
746 Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
747 f_resp->records = p_res->records;
748 *f_resp->numberOfRecordsReturned =
749 *p_res->numberOfRecordsReturned;
750 *f_resp->nextResultSetPosition =
751 *p_res->nextResultSetPosition;
752 }
753 else
754 {
755 package.response() = pp.response();
756 return;
757 }
758 }
759 package.response() = f_apdu; // in this scope because of p
760}
761
762void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
763{
764 // create present request
765 Z_PresentRequest *req = apdu_req->u.presentRequest;
766
767 Sets_it it;
768 it = m_sets.find(std::string(req->resultSetId));
769 if (it == m_sets.end())
770 {
771 mp::odr odr;
772 Z_APDU *apdu =
773 odr.create_presentResponse(
774 apdu_req,
775 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
776 req->resultSetId);
777 package.response() = apdu;
778 return;
779 }
780 std::list<Multi::FrontendSet::PresentJob> jobs;
781 int start = *req->resultSetStartPoint;
782 int number = *req->numberOfRecordsRequested;
783
784 if (m_p->m_merge_type == round_robin)
785 it->second.round_robin(start, number, jobs);
786 else if (m_p->m_merge_type == serve_order)
787 it->second.serve_order(start, number, jobs);
788
789 if (0)
790 {
791 std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
792 for (jit = jobs.begin(); jit != jobs.end(); jit++)
793 {
794 yaz_log(YLOG_DEBUG, "job pos=%d", jit->m_pos);
795 }
796 }
797
798 std::list<BackendPtr> present_backend_list;
799
800 std::list<BackendSet>::const_iterator bsit;
801 bsit = it->second.m_backend_sets.begin();
802 for (; bsit != it->second.m_backend_sets.end(); bsit++)
803 {
804 int start = -1;
805 int end = -1;
806 {
807 std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
808 for (jit = jobs.begin(); jit != jobs.end(); jit++)
809 {
810 if (jit->m_backend == bsit->m_backend)
811 {
812 if (start == -1 || jit->m_pos < start)
813 start = jit->m_pos;
814 if (end == -1 || jit->m_pos > end)
815 end = jit->m_pos;
816 }
817 }
818 }
819 if (start != -1)
820 {
821 std::list<Multi::FrontendSet::PresentJob>::iterator jit;
822 for (jit = jobs.begin(); jit != jobs.end(); jit++)
823 {
824 if (jit->m_backend == bsit->m_backend)
825 {
826 if (jit->m_pos >= start && jit->m_pos <= end)
827 jit->m_start = start;
828 }
829 }
830
831 PackagePtr p = bsit->m_backend->m_package;
832
833 *req->resultSetStartPoint = start;
834 *req->numberOfRecordsRequested = end - start + 1;
835
836 p->request() = apdu_req;
837 p->copy_filter(package);
838
839 present_backend_list.push_back(bsit->m_backend);
840 }
841 }
842 multi_move(present_backend_list);
843
844 // look at each response
845 Z_DiagRecs *z_diag = 0;
846 int no_successful = 0;
847 mp::odr odr;
848 PackagePtr close_p;
849
850 std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
851 for (; pbit != present_backend_list.end(); pbit++)
852 {
853 PackagePtr p = (*pbit)->m_package;
854
855 // save closing package for at least one target
856 if (p->session().is_closed())
857 close_p = p;
858
859 Z_GDU *gdu = p->response().get();
860 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
861 Z_APDU_presentResponse)
862 {
863 Z_APDU *b_apdu = gdu->u.z3950;
864 Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
865
866 record_diagnostics(b_resp->records, z_diag, odr, no_successful);
867 }
868 }
869
870 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
871 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
872
873 if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
874 {
875 package.session().close();
876 package.response() = close_p->response();
877 return;
878 }
879 if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
880 {
881 f_resp->records = (Z_Records *)
882 odr_malloc(odr, sizeof(*f_resp->records));
883 if (z_diag->num_diagRecs > 1)
884 {
885 f_resp->records->which = Z_Records_multipleNSD;
886 f_resp->records->u.multipleNonSurDiagnostics = z_diag;
887 }
888 else
889 {
890 f_resp->records->which = Z_Records_NSD;
891 f_resp->records->u.nonSurrogateDiagnostic =
892 z_diag->diagRecs[0]->u.defaultFormat;
893 }
894 }
895 else if (number < 0 || (size_t) number > jobs.size())
896 {
897 f_apdu =
898 odr.create_presentResponse(
899 apdu_req,
900 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
901 0);
902 }
903 else
904 {
905 f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
906 Z_Records * records = f_resp->records;
907 records->which = Z_Records_DBOSD;
908 records->u.databaseOrSurDiagnostics =
909 (Z_NamePlusRecordList *)
910 odr_malloc(odr, sizeof(Z_NamePlusRecordList));
911 Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
912 nprl->num_records = jobs.size();
913 nprl->records = (Z_NamePlusRecord**)
914 odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
915 int i = 0;
916 std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
917 for (jit = jobs.begin(); jit != jobs.end(); jit++)
918 {
919 PackagePtr p = jit->m_backend->m_package;
920
921 Z_GDU *gdu = p->response().get();
922 Z_APDU *b_apdu = gdu->u.z3950;
923 int inside_pos = jit->m_pos - jit->m_start;
924 Z_Records *records = b_apdu->u.presentResponse->records;
925
926 if (records && records->which == Z_Records_DBOSD
927 && inside_pos <
928 records->u.databaseOrSurDiagnostics->num_records)
929 {
930 nprl->records[i] = (Z_NamePlusRecord*)
931 odr_malloc(odr, sizeof(Z_NamePlusRecord));
932 *nprl->records[i] = *records->
933 u.databaseOrSurDiagnostics->records[inside_pos];
934 nprl->records[i]->databaseName =
935 odr_strdup(odr, jit->m_backend->m_vhost.c_str());
936 i++;
937 }
938 }
939 nprl->num_records = i; // usually same as jobs.size();
940 *f_resp->nextResultSetPosition = start + i;
941 *f_resp->numberOfRecordsReturned = i;
942 }
943 package.response() = f_apdu;
944}
945
946bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
947{
948 return m_norm_term < k.m_norm_term;
949}
950
951bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
952{
953 return m_norm_term == k.m_norm_term;
954}
955
956Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
957{
958 Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
959 e->which = Z_Entry_termInfo;
960 Z_TermInfo *t;
961 t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
962 t->suggestedAttributes = 0;
963 t->displayTerm = 0;
964 t->alternativeTerm = 0;
965 t->byAttributes = 0;
966 t->otherTermInfo = 0;
967 t->globalOccurrences = odr_intdup(odr, m_count);
968 t->term = (Z_Term *) odr_malloc(odr, sizeof(*t->term));
969 t->term->which = Z_Term_general;
970 t->term->u.general = odr_create_Odr_oct(odr,
971 m_norm_term.c_str(), m_norm_term.size());
972 return e;
973}
974
975void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
976{
977 std::list<BackendPtr>::const_iterator bit;
978 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
979 {
980 PackagePtr p = (*bit)->m_package;
981 mp::odr odr;
982
983 p->request() = apdu_req;
984 p->copy_filter(package);
985 }
986 multi_move(m_backend_list);
987 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
988 {
989 PackagePtr p = (*bit)->m_package;
990
991 if (p->session().is_closed()) // if any backend closes, close frontend
992 package.session().close();
993
994 package.response() = p->response();
995 }
996}
997
998
999void yf::Multi::Frontend::scan(mp::Package &package, Z_APDU *apdu_req)
1000{
1001 Z_ScanRequest *req = apdu_req->u.scanRequest;
1002
1003 std::list<BackendPtr>::const_iterator bit;
1004 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1005 {
1006 PackagePtr p = (*bit)->m_package;
1007 yazpp_1::GDU gdu1(apdu_req);
1008 mp::odr odr;
1009 Z_ScanRequest *req1 = gdu1.get()->u.z3950->u.scanRequest;
1010
1011 if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1012 &req1->num_databaseNames,
1013 &req1->databaseNames))
1014 {
1015 req1->num_databaseNames = req->num_databaseNames;
1016 req1->databaseNames = req->databaseNames;
1017 }
1018 p->request() = gdu1;
1019 p->copy_filter(package);
1020 }
1021 multi_move(m_backend_list);
1022
1023 ScanTermInfoList entries_before;
1024 ScanTermInfoList entries_after;
1025 int no_before = 0;
1026 int no_after = 0;
1027
1028 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1029 {
1030 PackagePtr p = (*bit)->m_package;
1031
1032 if (p->session().is_closed()) // if any backend closes, close frontend
1033 package.session().close();
1034
1035 Z_GDU *gdu = p->response().get();
1036 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1037 Z_APDU_scanResponse)
1038 {
1039 Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1040
1041 if (res->entries && res->entries->nonsurrogateDiagnostics)
1042 {
1043 // failure
1044 mp::odr odr;
1045 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1046 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1047
1048 f_res->entries->nonsurrogateDiagnostics =
1049 res->entries->nonsurrogateDiagnostics;
1050 f_res->entries->num_nonsurrogateDiagnostics =
1051 res->entries->num_nonsurrogateDiagnostics;
1052
1053 package.response() = f_apdu;
1054 return;
1055 }
1056
1057 if (res->entries && res->entries->entries)
1058 {
1059 Z_Entry **entries = res->entries->entries;
1060 int num_entries = res->entries->num_entries;
1061 int position = 1;
1062 if (req->preferredPositionInResponse)
1063 position = *req->preferredPositionInResponse;
1064 if (res->positionOfTerm)
1065 position = *res->positionOfTerm;
1066
1067 // before
1068 int i;
1069 for (i = 0; i<position-1 && i<num_entries; i++)
1070 {
1071 Z_Entry *ent = entries[i];
1072
1073 if (ent->which == Z_Entry_termInfo)
1074 {
1075 ScanTermInfo my;
1076
1077 Odr_int *occur = ent->u.termInfo->globalOccurrences;
1078 my.m_count = occur ? *occur : 0;
1079
1080 if (ent->u.termInfo->term->which == Z_Term_general)
1081 {
1082 my.m_norm_term = std::string(
1083 (const char *)
1084 ent->u.termInfo->term->u.general->buf,
1085 ent->u.termInfo->term->u.general->len);
1086 }
1087 if (my.m_norm_term.length())
1088 {
1089 ScanTermInfoList::iterator it =
1090 entries_before.begin();
1091 while (it != entries_before.end() && my <*it)
1092 it++;
1093 if (it != entries_before.end() && my == *it)
1094 {
1095 it->m_count += my.m_count;
1096 }
1097 else
1098 {
1099 entries_before.insert(it, my);
1100 no_before++;
1101 }
1102 }
1103 }
1104 }
1105 // after
1106 if (position <= 0)
1107 i = 0;
1108 else
1109 i = position-1;
1110 for ( ; i<num_entries; i++)
1111 {
1112 Z_Entry *ent = entries[i];
1113
1114 if (ent->which == Z_Entry_termInfo)
1115 {
1116 ScanTermInfo my;
1117
1118 Odr_int *occur = ent->u.termInfo->globalOccurrences;
1119 my.m_count = occur ? *occur : 0;
1120
1121 if (ent->u.termInfo->term->which == Z_Term_general)
1122 {
1123 my.m_norm_term = std::string(
1124 (const char *)
1125 ent->u.termInfo->term->u.general->buf,
1126 ent->u.termInfo->term->u.general->len);
1127 }
1128 if (my.m_norm_term.length())
1129 {
1130 ScanTermInfoList::iterator it =
1131 entries_after.begin();
1132 while (it != entries_after.end() && *it < my)
1133 it++;
1134 if (it != entries_after.end() && my == *it)
1135 {
1136 it->m_count += my.m_count;
1137 }
1138 else
1139 {
1140 entries_after.insert(it, my);
1141 no_after++;
1142 }
1143 }
1144 }
1145 }
1146
1147 }
1148 }
1149 else
1150 {
1151 // if any target does not return scan response - return that
1152 package.response() = p->response();
1153 return;
1154 }
1155 }
1156
1157 mp::odr odr;
1158 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1159 Z_ScanResponse *resp = f_apdu->u.scanResponse;
1160
1161 int number_returned = *req->numberOfTermsRequested;
1162 int position_returned = *req->preferredPositionInResponse;
1163
1164 resp->entries->num_entries = number_returned;
1165 resp->entries->entries = (Z_Entry**)
1166 odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1167 int i;
1168
1169 int lbefore = entries_before.size();
1170 if (lbefore < position_returned-1)
1171 position_returned = lbefore+1;
1172
1173 ScanTermInfoList::iterator it = entries_before.begin();
1174 for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1175 {
1176 resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1177 }
1178
1179 it = entries_after.begin();
1180
1181 if (position_returned <= 0)
1182 i = 0;
1183 else
1184 i = position_returned-1;
1185 for (; i<number_returned && it != entries_after.end(); i++, it++)
1186 {
1187 resp->entries->entries[i] = it->get_entry(odr);
1188 }
1189
1190 number_returned = i;
1191
1192 resp->positionOfTerm = odr_intdup(odr, position_returned);
1193 resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1194 resp->entries->num_entries = number_returned;
1195
1196 package.response() = f_apdu;
1197}
1198
1199
1200void yf::Multi::process(mp::Package &package) const
1201{
1202 FrontendPtr f = m_p->get_frontend(package);
1203
1204 Z_GDU *gdu = package.request().get();
1205
1206 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1207 Z_APDU_initRequest && !f->m_is_multi)
1208 {
1209 f->init(package, gdu);
1210 }
1211 else if (!f->m_is_multi)
1212 package.move();
1213 else if (gdu && gdu->which == Z_GDU_Z3950)
1214 {
1215 Z_APDU *apdu = gdu->u.z3950;
1216 if (apdu->which == Z_APDU_initRequest)
1217 {
1218 mp::odr odr;
1219
1220 package.response() = odr.create_close(
1221 apdu,
1222 Z_Close_protocolError,
1223 "double init");
1224
1225 package.session().close();
1226 }
1227 else if (apdu->which == Z_APDU_searchRequest)
1228 {
1229 f->search(package, apdu);
1230 }
1231 else if (apdu->which == Z_APDU_presentRequest)
1232 {
1233 f->present(package, apdu);
1234 }
1235 else if (apdu->which == Z_APDU_scanRequest)
1236 {
1237 f->scan(package, apdu);
1238 }
1239 else if (apdu->which == Z_APDU_close)
1240 {
1241 f->relay_apdu(package, apdu);
1242 }
1243 else
1244 {
1245 mp::odr odr;
1246
1247 package.response() = odr.create_close(
1248 apdu, Z_Close_protocolError,
1249 "unsupported APDU in filter multi");
1250
1251 package.session().close();
1252 }
1253 }
1254 m_p->release_frontend(package);
1255}
1256
1257void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1258 const char *path)
1259{
1260 for (ptr = ptr->children; ptr; ptr = ptr->next)
1261 {
1262 if (ptr->type != XML_ELEMENT_NODE)
1263 continue;
1264 if (!strcmp((const char *) ptr->name, "target"))
1265 {
1266 std::string auth;
1267 std::string route = mp::xml::get_route(ptr, auth);
1268 std::string target = mp::xml::get_text(ptr);
1269 if (target.length() == 0)
1270 target = route;
1271 m_p->m_route_patterns.push_back(Multi::Map(target, route, auth));
1272 }
1273 else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1274 {
1275 m_p->m_hide_unavailable = true;
1276 }
1277 else if (!strcmp((const char *) ptr->name, "hideerrors"))
1278 {
1279 m_p->m_hide_errors = true;
1280 }
1281 else if (!strcmp((const char *) ptr->name, "mergetype"))
1282 {
1283 std::string mergetype = mp::xml::get_text(ptr);
1284 if (mergetype == "roundrobin")
1285 m_p->m_merge_type = round_robin;
1286 else if (mergetype == "serveorder")
1287 m_p->m_merge_type = serve_order;
1288 else
1289 throw mp::filter::FilterException
1290 ("Bad mergetype " + mergetype + " in multi filter");
1291
1292 }
1293 else
1294 {
1295 throw mp::filter::FilterException
1296 ("Bad element "
1297 + std::string((const char *) ptr->name)
1298 + " in multi filter");
1299 }
1300 }
1301}
1302
1303static mp::filter::Base* filter_creator()
1304{
1305 return new mp::filter::Multi;
1306}
1307
1308extern "C" {
1309 struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1310 0,
1311 "multi",
1313 };
1314}
1315
1316
1317/*
1318 * Local variables:
1319 * c-basic-offset: 4
1320 * c-file-style: "Stroustrup"
1321 * indent-tabs-mode: nil
1322 * End:
1323 * vim: shiftwidth=4 tabstop=8 expandtab
1324 */
1325
Map(std::string pattern, std::string route, std::string auth)
bool match(const std::string target, std::string *ret, std::string *auth) const
std::map< mp::Session, FrontendPtr > m_clients
FrontendPtr get_frontend(Package &package)
std::list< Multi::Map > m_route_patterns
void release_frontend(Package &package)
boost::shared_ptr< Frontend > FrontendPtr
boost::shared_ptr< Backend > BackendPtr
std::list< ScanTermInfo > ScanTermInfoList
std::map< std::string, FrontendSet >::iterator Sets_it
boost::scoped_ptr< Rep > m_p
boost::shared_ptr< Package > PackagePtr
static mp::filter::Base * filter_creator()
static const int result_set_size
struct metaproxy_1_filter_struct metaproxy_1_filter_multi
bool operator==(const BackendSet &k) const
bool operator<(const BackendSet &k) const
void init(Package &package, Z_GDU *gdu)
void record_diagnostics(Z_Records *records, Z_DiagRecs *&z_diag, ODR odr, int &no_successful)
void present(Package &package, Z_APDU *apdu)
void relay_apdu(Package &package, Z_APDU *apdu)
void scan(Package &package, Z_APDU *apdu)
void search(Package &package, Z_APDU *apdu)
std::list< BackendPtr > m_backend_list
void multi_move(std::list< BackendPtr > &blist)
std::map< std::string, Multi::FrontendSet > m_sets
bool operator<(const ScanTermInfo &) const
bool operator==(const ScanTermInfo &) const