298void yf::Multi::FrontendSet::round_robin(
int start,
int number,
299 std::list<PresentJob> &jobs)
302 std::list<BackendSet>::const_iterator bsit;
303 for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
317 for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
319 if (bsit->m_count > omin)
321 if (no_left == 0 || bsit->m_count < min)
328 int skip = no_left * min;
329 if (p + skip > start)
332 min = (start-p) / no_left;
336 std::list<int>::iterator psit = pos.begin();
337 for (psit = pos.begin(); psit != pos.end(); psit++)
344 std::list<int>::iterator psit = pos.begin();
345 for (psit = pos.begin(); psit != pos.end(); psit++)
356 std::list<int>::iterator psit = pos.begin();
357 bsit = m_backend_sets.begin();
359 for (; bsit != m_backend_sets.end(); psit++,bsit++)
361 if (fetched >= number)
366 if (*psit <= bsit->m_count)
382void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
384 Z_InitRequest *req = gdu->u.z3950->u.initRequest;
386 std::list<std::string> targets;
388 mp::util::get_vhost_otherinfo(req->otherInfo, targets);
390 if (targets.size() < 1)
396 std::list<std::string>::const_iterator t_it = targets.begin();
397 for (; t_it != targets.end(); t_it++)
403 std::list<Multi::Map>::const_iterator it =
404 m_p->m_route_patterns.begin();
405 while (it != m_p->m_route_patterns.end()) {
419 std::list<BackendPtr>::iterator bit;
420 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
424 Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
426 std::list<std::string>vhost_one;
427 vhost_one.push_back(b->m_vhost);
428 mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
432 Z_InitRequest *breq = init_apdu->u.initRequest;
434 if (b->m_auth.length())
436 breq->idAuthentication =
437 (Z_IdAuthentication *)
438 odr_malloc(odr,
sizeof(*breq->idAuthentication));
439 breq->idAuthentication->which = Z_IdAuthentication_open;
440 breq->idAuthentication->u.open = odr_strdup(odr, b->m_auth.c_str());
443 breq->idAuthentication = req->idAuthentication;
445 *breq->preferredMessageSize = *req->preferredMessageSize;
446 *breq->maximumRecordSize = *req->maximumRecordSize;
449 const char *peer_name = yaz_oi_get_string_oid(
450 &req->otherInfo, yaz_oid_userinfo_client_ip, 1, 0);
452 yaz_oi_set_string_oid(&breq->otherInfo, odr,
453 yaz_oid_userinfo_client_ip, 1, peer_name);
455 ODR_MASK_SET(breq->options, Z_Options_search);
456 ODR_MASK_SET(breq->options, Z_Options_present);
457 ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
458 ODR_MASK_SET(breq->options, Z_Options_scan);
460 ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
461 ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
462 ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
464 b->m_package->request() = init_apdu;
466 b->m_package->copy_filter(package);
468 multi_move(m_backend_list);
473 Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
474 Z_InitResponse *f_resp = f_apdu->u.initResponse;
476 ODR_MASK_SET(f_resp->options, Z_Options_search);
477 ODR_MASK_SET(f_resp->options, Z_Options_present);
478 ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
479 ODR_MASK_SET(f_resp->options, Z_Options_scan);
481 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
482 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
483 ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
486 int no_succeeded = 0;
488 Odr_int preferredMessageSize = *req->preferredMessageSize;
489 Odr_int maximumRecordSize = *req->maximumRecordSize;
490 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
494 if (p->session().is_closed())
498 bit = m_backend_list.erase(bit);
501 Z_GDU *gdu = p->response().get();
502 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
506 Z_APDU *b_apdu = gdu->u.z3950;
507 Z_InitResponse *b_resp = b_apdu->u.initResponse;
510 for (i = 0; i <= Z_Options_stringSchema; i++)
512 if (!ODR_MASK_GET(b_resp->options, i))
513 ODR_MASK_CLEAR(f_resp->options, i);
516 for (i = 0; i <= Z_ProtocolVersion_3; i++)
517 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
518 ODR_MASK_CLEAR(f_resp->protocolVersion, i);
522 if (preferredMessageSize > *b_resp->preferredMessageSize)
523 preferredMessageSize = *b_resp->preferredMessageSize;
524 if (maximumRecordSize > *b_resp->maximumRecordSize)
525 maximumRecordSize = *b_resp->maximumRecordSize;
529 if (!f_resp->userInformationField
530 && b_resp->userInformationField)
531 f_resp->userInformationField = b_resp->userInformationField;
539 *f_resp->preferredMessageSize = preferredMessageSize;
540 *f_resp->maximumRecordSize = maximumRecordSize;
542 if (m_p->m_hide_unavailable)
544 if (no_succeeded == 0)
547 package.session().close();
555 package.session().close();
558 package.response() = f_apdu;
561void yf::Multi::Frontend::record_diagnostics(Z_Records *records,
562 Z_DiagRecs * &z_diag,
569 if (records->which == Z_Records_NSD)
573 z_diag = (Z_DiagRecs *)
574 odr_malloc(odr,
sizeof(*z_diag));
575 z_diag->num_diagRecs = 0;
576 z_diag->diagRecs = (Z_DiagRec**)
577 odr_malloc(odr,
sizeof(*z_diag->diagRecs));
581 Z_DiagRec **n = (Z_DiagRec **)
583 (1 + z_diag->num_diagRecs) *
sizeof(*n));
584 memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
586 z_diag->diagRecs = n;
588 Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr,
sizeof(*nr));
589 nr->which = Z_DiagRec_defaultFormat;
590 nr->u.defaultFormat =
591 records->u.nonSurrogateDiagnostic;
592 z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
594 else if (records->which == Z_Records_multipleNSD)
596 Z_DiagRecs * dr =records->u.multipleNonSurDiagnostics;
600 z_diag = (Z_DiagRecs *) odr_malloc(odr,
sizeof(*z_diag));
601 z_diag->num_diagRecs = 0;
602 z_diag->diagRecs = 0;
604 Z_DiagRec **n = (Z_DiagRec **)
606 (dr->num_diagRecs + z_diag->num_diagRecs) *
608 if (z_diag->num_diagRecs)
609 memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs *
sizeof(*n));
610 memcpy(n + z_diag->num_diagRecs,
611 dr->diagRecs, dr->num_diagRecs *
sizeof(*n));
612 z_diag->diagRecs = n;
613 z_diag->num_diagRecs += dr->num_diagRecs;
622void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
625 Z_SearchRequest *req = apdu_req->u.searchRequest;
627 std::list<BackendPtr>::const_iterator bit;
628 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
631 yazpp_1::GDU gdu1(apdu_req);
633 Z_SearchRequest *req1 = gdu1.get()->u.z3950->u.searchRequest;
636 *req1->smallSetUpperBound = 0;
637 *req1->largeSetLowerBound = 1;
638 *req1->mediumSetPresentNumber = 0;
640 if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
641 &req1->num_databaseNames,
642 &req1->databaseNames))
644 req1->num_databaseNames = req->num_databaseNames;
645 req1->databaseNames = req->databaseNames;
648 p->copy_filter(package);
650 multi_move(m_backend_list);
653 FrontendSet resultSet(std::string(req->resultSetName));
657 Z_DiagRecs *z_diag = 0;
658 int no_successful = 0;
661 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
666 if (p->session().is_closed())
669 Z_GDU *gdu = p->response().get();
670 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
671 Z_APDU_searchResponse)
673 Z_APDU *b_apdu = gdu->u.z3950;
674 Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
676 record_diagnostics(b_resp->records, z_diag, odr, no_successful);
680 backendSet.
m_count = *b_resp->resultCount;
686 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
687 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
689 yaz_log(YLOG_DEBUG,
"no_successful=%d is_closed=%s hide_errors=%s",
691 close_p ?
"true" :
"false",
692 m_p->m_hide_errors ?
"true" :
"false");
694 if (close_p && (no_successful == 0 || !m_p->m_hide_unavailable))
696 package.session().close();
697 package.response() = close_p->response();
700 if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
702 f_resp->records = (Z_Records *)
703 odr_malloc(odr,
sizeof(*f_resp->records));
704 if (z_diag->num_diagRecs > 1)
706 f_resp->records->which = Z_Records_multipleNSD;
707 f_resp->records->u.multipleNonSurDiagnostics = z_diag;
711 f_resp->records->which = Z_Records_NSD;
712 f_resp->records->u.nonSurrogateDiagnostic =
713 z_diag->diagRecs[0]->u.defaultFormat;
720 mp::util::piggyback(*req->smallSetUpperBound,
721 *req->largeSetLowerBound,
722 *req->mediumSetPresentNumber,
726 Package pp(package.session(), package.origin());
727 if (z_diag == 0 && number > 0)
729 pp.copy_filter(package);
730 Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
731 Z_PresentRequest *p_req = p_apdu->u.presentRequest;
732 p_req->preferredRecordSyntax = req->preferredRecordSyntax;
733 p_req->resultSetId = req->resultSetName;
734 *p_req->resultSetStartPoint = 1;
735 *p_req->numberOfRecordsRequested = number;
736 pp.request() = p_apdu;
739 if (pp.session().is_closed())
740 package.session().close();
742 Z_GDU *gdu = pp.response().get();
743 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
744 Z_APDU_presentResponse)
746 Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
747 f_resp->records = p_res->records;
748 *f_resp->numberOfRecordsReturned =
749 *p_res->numberOfRecordsReturned;
750 *f_resp->nextResultSetPosition =
751 *p_res->nextResultSetPosition;
755 package.response() = pp.response();
759 package.response() = f_apdu;
762void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
765 Z_PresentRequest *req = apdu_req->u.presentRequest;
768 it = m_sets.find(std::string(req->resultSetId));
769 if (it == m_sets.end())
773 odr.create_presentResponse(
775 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
777 package.response() = apdu;
780 std::list<Multi::FrontendSet::PresentJob> jobs;
781 int start = *req->resultSetStartPoint;
782 int number = *req->numberOfRecordsRequested;
785 it->second.round_robin(start, number, jobs);
787 it->second.serve_order(start, number, jobs);
791 std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
792 for (jit = jobs.begin(); jit != jobs.end(); jit++)
794 yaz_log(YLOG_DEBUG,
"job pos=%d", jit->m_pos);
798 std::list<BackendPtr> present_backend_list;
800 std::list<BackendSet>::const_iterator bsit;
801 bsit = it->second.m_backend_sets.begin();
802 for (; bsit != it->second.m_backend_sets.end(); bsit++)
807 std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
808 for (jit = jobs.begin(); jit != jobs.end(); jit++)
810 if (jit->m_backend == bsit->m_backend)
812 if (start == -1 || jit->m_pos < start)
814 if (end == -1 || jit->m_pos > end)
821 std::list<Multi::FrontendSet::PresentJob>::iterator jit;
822 for (jit = jobs.begin(); jit != jobs.end(); jit++)
824 if (jit->m_backend == bsit->m_backend)
826 if (jit->m_pos >= start && jit->m_pos <= end)
827 jit->m_start = start;
833 *req->resultSetStartPoint = start;
834 *req->numberOfRecordsRequested = end - start + 1;
836 p->request() = apdu_req;
837 p->copy_filter(package);
839 present_backend_list.push_back(bsit->m_backend);
842 multi_move(present_backend_list);
845 Z_DiagRecs *z_diag = 0;
846 int no_successful = 0;
850 std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
851 for (; pbit != present_backend_list.end(); pbit++)
856 if (p->session().is_closed())
859 Z_GDU *gdu = p->response().get();
860 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
861 Z_APDU_presentResponse)
863 Z_APDU *b_apdu = gdu->u.z3950;
864 Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
866 record_diagnostics(b_resp->records, z_diag, odr, no_successful);
870 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
871 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
873 if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
875 package.session().close();
876 package.response() = close_p->response();
879 if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
881 f_resp->records = (Z_Records *)
882 odr_malloc(odr,
sizeof(*f_resp->records));
883 if (z_diag->num_diagRecs > 1)
885 f_resp->records->which = Z_Records_multipleNSD;
886 f_resp->records->u.multipleNonSurDiagnostics = z_diag;
890 f_resp->records->which = Z_Records_NSD;
891 f_resp->records->u.nonSurrogateDiagnostic =
892 z_diag->diagRecs[0]->u.defaultFormat;
895 else if (number < 0 || (
size_t) number > jobs.size())
898 odr.create_presentResponse(
900 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
905 f_resp->records = (Z_Records *) odr_malloc(odr,
sizeof(Z_Records));
906 Z_Records * records = f_resp->records;
907 records->which = Z_Records_DBOSD;
908 records->u.databaseOrSurDiagnostics =
909 (Z_NamePlusRecordList *)
910 odr_malloc(odr,
sizeof(Z_NamePlusRecordList));
911 Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
912 nprl->num_records = jobs.size();
913 nprl->records = (Z_NamePlusRecord**)
914 odr_malloc(odr,
sizeof(Z_NamePlusRecord *) * nprl->num_records);
916 std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
917 for (jit = jobs.begin(); jit != jobs.end(); jit++)
921 Z_GDU *gdu = p->response().get();
922 Z_APDU *b_apdu = gdu->u.z3950;
923 int inside_pos = jit->m_pos - jit->m_start;
924 Z_Records *records = b_apdu->u.presentResponse->records;
926 if (records && records->which == Z_Records_DBOSD
928 records->u.databaseOrSurDiagnostics->num_records)
930 nprl->records[i] = (Z_NamePlusRecord*)
931 odr_malloc(odr,
sizeof(Z_NamePlusRecord));
932 *nprl->records[i] = *records->
933 u.databaseOrSurDiagnostics->records[inside_pos];
934 nprl->records[i]->databaseName =
935 odr_strdup(odr, jit->m_backend->m_vhost.c_str());
939 nprl->num_records = i;
940 *f_resp->nextResultSetPosition = start + i;
941 *f_resp->numberOfRecordsReturned = i;
943 package.response() = f_apdu;
999void yf::Multi::Frontend::scan(mp::Package &package, Z_APDU *apdu_req)
1001 Z_ScanRequest *req = apdu_req->u.scanRequest;
1003 std::list<BackendPtr>::const_iterator bit;
1004 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1007 yazpp_1::GDU gdu1(apdu_req);
1009 Z_ScanRequest *req1 = gdu1.get()->u.z3950->u.scanRequest;
1011 if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1012 &req1->num_databaseNames,
1013 &req1->databaseNames))
1015 req1->num_databaseNames = req->num_databaseNames;
1016 req1->databaseNames = req->databaseNames;
1018 p->request() = gdu1;
1019 p->copy_filter(package);
1021 multi_move(m_backend_list);
1028 for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1032 if (p->session().is_closed())
1033 package.session().close();
1035 Z_GDU *gdu = p->response().get();
1036 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1037 Z_APDU_scanResponse)
1039 Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1041 if (res->entries && res->entries->nonsurrogateDiagnostics)
1045 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1046 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1048 f_res->entries->nonsurrogateDiagnostics =
1049 res->entries->nonsurrogateDiagnostics;
1050 f_res->entries->num_nonsurrogateDiagnostics =
1051 res->entries->num_nonsurrogateDiagnostics;
1053 package.response() = f_apdu;
1057 if (res->entries && res->entries->entries)
1059 Z_Entry **entries = res->entries->entries;
1060 int num_entries = res->entries->num_entries;
1062 if (req->preferredPositionInResponse)
1063 position = *req->preferredPositionInResponse;
1064 if (res->positionOfTerm)
1065 position = *res->positionOfTerm;
1069 for (i = 0; i<position-1 && i<num_entries; i++)
1071 Z_Entry *ent = entries[i];
1073 if (ent->which == Z_Entry_termInfo)
1077 Odr_int *occur = ent->u.termInfo->globalOccurrences;
1078 my.
m_count = occur ? *occur : 0;
1080 if (ent->u.termInfo->term->which == Z_Term_general)
1084 ent->u.termInfo->term->u.general->buf,
1085 ent->u.termInfo->term->u.general->len);
1089 ScanTermInfoList::iterator it =
1090 entries_before.begin();
1091 while (it != entries_before.end() && my <*it)
1093 if (it != entries_before.end() && my == *it)
1099 entries_before.insert(it, my);
1110 for ( ; i<num_entries; i++)
1112 Z_Entry *ent = entries[i];
1114 if (ent->which == Z_Entry_termInfo)
1118 Odr_int *occur = ent->u.termInfo->globalOccurrences;
1119 my.
m_count = occur ? *occur : 0;
1121 if (ent->u.termInfo->term->which == Z_Term_general)
1125 ent->u.termInfo->term->u.general->buf,
1126 ent->u.termInfo->term->u.general->len);
1130 ScanTermInfoList::iterator it =
1131 entries_after.begin();
1132 while (it != entries_after.end() && *it < my)
1134 if (it != entries_after.end() && my == *it)
1140 entries_after.insert(it, my);
1152 package.response() = p->response();
1158 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1159 Z_ScanResponse *resp = f_apdu->u.scanResponse;
1161 int number_returned = *req->numberOfTermsRequested;
1162 int position_returned = *req->preferredPositionInResponse;
1164 resp->entries->num_entries = number_returned;
1165 resp->entries->entries = (Z_Entry**)
1166 odr_malloc(odr,
sizeof(Z_Entry*) * number_returned);
1169 int lbefore = entries_before.size();
1170 if (lbefore < position_returned-1)
1171 position_returned = lbefore+1;
1173 ScanTermInfoList::iterator it = entries_before.begin();
1174 for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1176 resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1179 it = entries_after.begin();
1181 if (position_returned <= 0)
1184 i = position_returned-1;
1185 for (; i<number_returned && it != entries_after.end(); i++, it++)
1187 resp->entries->entries[i] = it->get_entry(odr);
1190 number_returned = i;
1192 resp->positionOfTerm = odr_intdup(odr, position_returned);
1193 resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1194 resp->entries->num_entries = number_returned;
1196 package.response() = f_apdu;
1200void yf::Multi::process(mp::Package &package)
const
1204 Z_GDU *gdu = package.request().get();
1206 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1207 Z_APDU_initRequest && !f->m_is_multi)
1209 f->init(package, gdu);
1211 else if (!f->m_is_multi)
1213 else if (gdu && gdu->which == Z_GDU_Z3950)
1215 Z_APDU *apdu = gdu->u.z3950;
1216 if (apdu->which == Z_APDU_initRequest)
1220 package.response() = odr.create_close(
1222 Z_Close_protocolError,
1225 package.session().close();
1227 else if (apdu->which == Z_APDU_searchRequest)
1229 f->search(package, apdu);
1231 else if (apdu->which == Z_APDU_presentRequest)
1233 f->present(package, apdu);
1235 else if (apdu->which == Z_APDU_scanRequest)
1237 f->scan(package, apdu);
1239 else if (apdu->which == Z_APDU_close)
1241 f->relay_apdu(package, apdu);
1247 package.response() = odr.create_close(
1248 apdu, Z_Close_protocolError,
1249 "unsupported APDU in filter multi");
1251 package.session().close();
1254 m_p->release_frontend(package);